Exploring Operating Systems

Day 53: Advanced Distributed Operating System Algorithms and Consensus Protocols

Table of Contents

  1. Introduction
  2. Distributed System Fundamentals
  3. Consensus Protocols
  4. Clock Synchronization
  5. Distributed Mutual Exclusion
  6. Leader Election Algorithms
  7. Distributed Transaction Management
  8. Fault Tolerance Mechanisms
  9. State Machine Replication
  10. Implementation Examples
  11. Performance Analysis
  12. Conclusion

1. Introduction

Distributed Operating System (DOS) algorithms are the backbone of modern distributed systems, enabling multiple nodes to work together seamlessly while maintaining consistency, fault tolerance, and performance. This article explains the advanced distributed algorithms and consensus protocols, such as Raft, Lamport clocks, Ricart-Agrawala mutual exclusion, and Two-Phase Commit. These algorithms are essential for building reliable, scalable, and efficient distributed systems.

2. Distributed System Fundamentals

A distributed system consists of multiple nodes that communicate and coordinate to achieve a common goal. Each node in the system has its own state and interacts with other nodes through messages. The following code demonstrates the structure of a distributed node:

// Node structure in distributed system
struct distributed_node {
    uint64_t node_id;
    struct sockaddr_in address;
    enum node_state {
        NODE_ACTIVE,
        NODE_SUSPENDED,
        NODE_FAILED
    } state;
    struct list_head peers;
    pthread_mutex_t lock;
    struct timespec last_heartbeat;
};

// Initialize distributed node
int init_distributed_node(struct distributed_node* node, uint64_t id) {
    if (!node)
        return -EINVAL;
        
    node->node_id = id;
    INIT_LIST_HEAD(&node->peers);
    pthread_mutex_init(&node->lock, NULL);
    node->state = NODE_ACTIVE;
    clock_gettime(CLOCK_MONOTONIC, &node->last_heartbeat);
    
    return 0;
}

The distributed_node structure represents a node in the distributed system, including its ID, address, state, and list of peers. The init_distributed_node function initializes the node and sets its initial state to NODE_ACTIVE.

3. Consensus Protocols

Consensus protocols ensure that all nodes in a distributed system agree on a single value or state. The Raft consensus protocol is one of the most widely used algorithms for achieving consensus.

The following code demonstrates the implementation of Raft:

// Raft state structure
struct raft_state {
    enum raft_role {
        FOLLOWER,
        CANDIDATE,
        LEADER
    } role;
    
    uint64_t current_term;
    uint64_t voted_for;
    struct list_head log_entries;
    uint64_t commit_index;
    uint64_t last_applied;
    
    // Leader-specific state
    struct {
        uint64_t* next_index;
        uint64_t* match_index;
    } leader_state;
    
    pthread_mutex_t state_lock;
};

// Raft log entry
struct log_entry {
    uint64_t term;
    uint64_t index;
    void* command;
    size_t command_len;
    struct list_head list;
};

// Initialize Raft state
int init_raft_state(struct raft_state* state) {
    state->role = FOLLOWER;
    state->current_term = 0;
    state->voted_for = UINT64_MAX;
    INIT_LIST_HEAD(&state->log_entries);
    state->commit_index = 0;
    state->last_applied = 0;
    
    pthread_mutex_init(&state->state_lock, NULL);
    
    return 0;
}

// Handle RequestVote RPC
int handle_request_vote(struct raft_state* state, 
                       struct request_vote_args* args,
                       struct request_vote_reply* reply) {
    pthread_mutex_lock(&state->state_lock);
    
    reply->term = state->current_term;
    reply->vote_granted = 0;
    
    if (args->term < state->current_term) {
        pthread_mutex_unlock(&state->state_lock);
        return 0;
    }
    
    if (args->term > state->current_term) {
        state->current_term = args->term;
        state->voted_for = UINT64_MAX;
        state->role = FOLLOWER;
    }
    
    if (state->voted_for == UINT64_MAX || 
        state->voted_for == args->candidate_id) {
        struct log_entry* last_entry = get_last_log_entry(state);
        if (!last_entry || 
            args->last_log_term > last_entry->term ||
            (args->last_log_term == last_entry->term && 
             args->last_log_index >= last_entry->index)) {
            state->voted_for = args->candidate_id;
            reply->vote_granted = 1;
        }
    }
    
    pthread_mutex_unlock(&state->state_lock);
    return 0;
}

The raft_state structure represents the state of a node in the Raft protocol, including its role (follower, candidate, or leader), current term, and log entries. The handle_request_vote function processes vote requests from other nodes during leader election.

4. Clock Synchronization

Clock synchronization is essential for maintaining a consistent view of time across distributed nodes. Lamport’s logical clocks and vector clocks are commonly used for this purpose. The following code demonstrates the implementation of Lamport clocks:

// Lamport clock structure
struct lamport_clock {
    atomic_uint_fast64_t timestamp;
    pthread_mutex_t lock;
};

// Initialize Lamport clock
void init_lamport_clock(struct lamport_clock* clock) {
    atomic_store(&clock->timestamp, 0);
    pthread_mutex_init(&clock->lock, NULL);
}

// Update Lamport clock
uint64_t lamport_tick(struct lamport_clock* clock) {
    return atomic_fetch_add(&clock->timestamp, 1) + 1;
}

// Synchronize with received timestamp
void lamport_receive(struct lamport_clock* clock, uint64_t received_time) {
    uint64_t current = atomic_load(&clock->timestamp);
    uint64_t new_time = max(current, received_time) + 1;
    atomic_store(&clock->timestamp, new_time);
}

// Vector clock implementation
struct vector_clock {
    uint64_t* timestamps;
    size_t num_processes;
    pthread_mutex_t lock;
};

// Initialize vector clock
int init_vector_clock(struct vector_clock* clock, size_t num_processes) {
    clock->timestamps = calloc(num_processes, sizeof(uint64_t));
    if (!clock->timestamps)
        return -ENOMEM;
        
    clock->num_processes = num_processes;
    pthread_mutex_init(&clock->lock, NULL);
    return 0;
}

The lamport_clock structure represents a logical clock, while the vector_clock structure represents a vector clock. The lamport_tick and lamport_receive functions update the Lamport clock, while the init_vector_clock function initializes a vector clock.

5. Distributed Mutual Exclusion

Distributed mutual exclusion ensures that only one node can access a shared resource at a time. The Ricart-Agrawala algorithm is a widely used solution for this problem. The following code demonstrates its implementation:

// Mutual exclusion request structure
struct mutex_request {
    uint64_t timestamp;
    uint64_t node_id;
    uint64_t resource_id;
};

// Distributed mutex structure
struct distributed_mutex {
    uint64_t resource_id;
    struct lamport_clock clock;
    struct list_head pending_requests;
    bool has_lock;
    uint64_t reply_count;
    pthread_mutex_t lock;
    pthread_cond_t cond;
};

// Request critical section
int request_critical_section(struct distributed_mutex* mutex) {
    struct mutex_request request;
    
    pthread_mutex_lock(&mutex->lock);
    
    request.timestamp = lamport_tick(&mutex->clock);
    request.node_id = get_local_node_id();
    request.resource_id = mutex->resource_id;
    
    // Broadcast request to all nodes
    broadcast_mutex_request(&request);
    
    // Wait for replies
    while (mutex->reply_count < get_total_nodes() - 1) {
        pthread_cond_wait(&mutex->cond, &mutex->lock);
    }
    
    mutex->has_lock = true;
    pthread_mutex_unlock(&mutex->lock);
    
    return 0;
}

// Handle mutex request
int handle_mutex_request(struct mutex_request* request) {
    struct distributed_mutex* mutex = find_mutex(request->resource_id);
    
    pthread_mutex_lock(&mutex->lock);
    
    if (!mutex->has_lock || 
        compare_requests(request, &mutex->local_request) > 0) {
        send_mutex_reply(request->node_id);
    } else {
        // Add to pending requests
        add_pending_request(mutex, request);
    }
    
    pthread_mutex_unlock(&mutex->lock);
    
    return 0;
}

The distributed_mutex structure represents a distributed mutex, while the request_critical_section and handle_mutex_request functions implement the Ricart-Agrawala algorithm.

6. Leader Election Algorithms

Leader election algorithms ensure that a single node is chosen as the leader in a distributed system. The Bully algorithm is a common solution for this problem. The following code demonstrates its implementation:

// Election message types
enum election_message_type {
    ELECTION,
    ANSWER,
    COORDINATOR
};

// Election message structure
struct election_message {
    enum election_message_type type;
    uint64_t sender_id;
    uint64_t term;
};

// Election state structure
struct election_state {
    uint64_t current_leader;
    uint64_t current_term;
    bool election_in_progress;
    struct timespec election_timeout;
    pthread_mutex_t lock;
    pthread_cond_t cond;
};

// Start election
int start_election(struct election_state* state) {
    pthread_mutex_lock(&state->lock);
    
    state->current_term++;
    state->election_in_progress = true;
    
    // Send election messages to higher-priority nodes
    struct election_message msg = {
        .type = ELECTION,
        .sender_id = get_local_node_id(),
        .term = state->current_term
    };
    
    broadcast_to_higher_nodes(&msg);
    
    // Wait for responses with timeout
    struct timespec timeout;
    clock_gettime(CLOCK_REALTIME, &timeout);
    timespec_add_ms(&timeout, ELECTION_TIMEOUT_MS);
    
    int ret = pthread_cond_timedwait(&state->cond, &state->lock, &timeout);
    
    if (ret == ETIMEDOUT) {
        // Declare self as leader
        declare_leader(state);
    }
    
    pthread_mutex_unlock(&state->lock);
    return 0;
}

The election_state structure represents the state of a node during leader election, while the start_election function implements the Bully algorithm.

7. Distributed Transaction Management

Distributed transaction management ensures that transactions are executed atomically across multiple nodes. The Two-Phase Commit (2PC) protocol is a common solution for this problem. The following code demonstrates its implementation:

// Transaction state
enum transaction_state {
    INIT,
    PREPARING,
    PREPARED,
    COMMITTING,
    COMMITTED,
    ABORTING,
    ABORTED
};

// Transaction coordinator
struct transaction_coordinator {
    uint64_t transaction_id;
    enum transaction_state state;
    struct list_head participants;
    pthread_mutex_t lock;
    pthread_cond_t cond;
};

// Participant structure
struct transaction_participant {
    uint64_t node_id;
    enum transaction_state state;
    struct list_head list;
};

// Two-phase commit implementation
int two_phase_commit(struct transaction_coordinator* coord) {
    int ret;
    
    // Phase 1: Prepare
    ret = send_prepare_to_all(coord);
    if (ret != 0) {
        abort_transaction(coord);
        return ret;
    }
    
    // Wait for all prepare responses
    ret = wait_for_prepare_responses(coord);
    if (ret != 0) {
        abort_transaction(coord);
        return ret;
    }
    
    // Phase 2: Commit
    coord->state = COMMITTING;
    ret = send_commit_to_all(coord);
    if (ret != 0) {
        // Handle partial commit scenario
        handle_partial_commit(coord);
        return ret;
    }
    
    coord->state = COMMITTED;
    return 0;
}

The transaction_coordinator structure represents the coordinator in the 2PC protocol, while the two_phase_commit function implements the protocol.

8. Fault Tolerance Mechanisms

Fault tolerance mechanisms ensure that a distributed system can continue operating even when some nodes fail. The following code demonstrates the implementation of a fault detector:

// Fault detector structure
struct fault_detector {
    struct list_head monitored_nodes;
    pthread_t detector_thread;
    uint64_t heartbeat_interval;
    uint64_t failure_threshold;
    pthread_mutex_t lock;
};

// Node monitoring structure
struct monitored_node {
    uint64_t node_id;
    struct timespec last_heartbeat;
    bool suspected;
    struct list_head list;
};

// Fault detector thread
void* fault_detector_thread(void* arg) {
    struct fault_detector* detector = arg;
    struct timespec now;
    
    while (1) {
        pthread_mutex_lock(&detector->lock);
        clock_gettime(CLOCK_MONOTONIC, &now);
        
        struct monitored_node* node;
        list_for_each_entry(node, &detector->monitored_nodes, list) {
            if (timespec_diff_ms(&now, &node->last_heartbeat) > 
                detector->failure_threshold) {
                if (!node->suspected) {
                    node->suspected = true;
                    handle_node_failure(node->node_id);
                }
            }
        }
        
        pthread_mutex_unlock(&detector->lock);
        sleep_ms(detector->heartbeat_interval);
    }
    
    return NULL;
}

The fault_detector structure represents a fault detector, while the fault_detector_thread function monitors nodes for failures.

9. State Machine Replication

State machine replication ensures that all nodes in a distributed system maintain the same state. The following code demonstrates the implementation of a state machine:

// State machine structure
struct state_machine {
    void* state;
    uint64_t last_applied;
    struct list_head command_log;
    pthread_mutex_t lock;
};

// Command structure
struct command {
    uint64_t sequence_number;
    void* data;
    size_t data_len;
    struct list_head list;
};

// Apply command to state machine
int apply_command(struct state_machine* sm, struct command* cmd) {
    pthread_mutex_lock(&sm->lock);
    
    if (cmd->sequence_number <= sm->last_applied) {
        pthread_mutex_unlock(&sm->lock);
        return 0;  // Already applied
    }
    
    // Check for gaps in sequence
    if (cmd->sequence_number > sm->last_applied + 1) {
        pthread_mutex_unlock(&sm->lock);
        return -EAGAIN;  // Need to wait for missing commands
    }
    
    // Apply command to state
    int ret = execute_command(sm->state, cmd);
    if (ret == 0) {
        sm->last_applied = cmd->sequence_number;
        add_to_command_log(sm, cmd);
    }
    
    pthread_mutex_unlock(&sm->lock);
    return ret;
}

The state_machine structure represents a state machine, while the apply_command function applies commands to the state machine.

10. Implementation Examples

The following code demonstrates the initialization of a distributed system:

// Main distributed system initialization
int init_distributed_system(struct distributed_node* nodes, size_t num_nodes) {
    int ret;
    
    for (size_t i = 0; i < num_nodes; i++) {
        ret = init_distributed_node(&nodes[i], i);
        if (ret)
            return ret;
    }
    
    return 0;
}

The init_distributed_system function initializes a distributed system by initializing each node.

11. Performance Analysis

Performance analysis is essential for understanding the behavior of a distributed system. The following code demonstrates the implementation of performance monitoring:

// Performance monitoring structure
struct performance_metrics {
    atomic_uint_fast64_t total_requests;
    atomic_uint_fast64_t successful_requests;
    atomic_uint_fast64_t failed_requests;
    atomic_uint_fast64_t total_latency_ms;
    struct timespec start_time;
};

// Initialize performance monitoring
void init_performance_monitoring(struct performance_metrics* metrics) {
    atomic_store(&metrics->total_requests, 0);
    atomic_store(&metrics->successful_requests, 0);
    atomic_store(&metrics->failed_requests, 0);
    atomic_store(&metrics->total_latency_ms, 0);
    clock_gettime(CLOCK_MONOTONIC, &metrics->start_time);
}

// Record request metrics
void record_request_metrics(struct performance_metrics* metrics, 
                          bool success, uint64_t latency_ms) {
    atomic_fetch_add(&metrics->total_requests, 1);
    if (success) {
        atomic_fetch_add(&metrics->successful_requests, 1);
        atomic_fetch_add(&metrics->total_latency_ms, latency_ms);
    } else {
        atomic_fetch_add(&metrics->failed_requests, 1);
    }
}

The performance_metrics structure represents performance metrics, while the record_request_metrics function records the metrics for each request.

12. Conclusion

Distributed Operating System algorithms and consensus protocols are essential for building reliable, scalable, and efficient distributed systems. This article has covered the fundamental concepts, implementation details, and best practices for advanced distributed algorithms, including Raft, Lamport clocks, Ricart-Agrawala mutual exclusion, and Two-Phase Commit. By following the techniques and patterns discussed in this article, developers can create robust distributed systems that meet the demands of modern applications.