项目6:多集群资源聚合器

项目6:多集群资源聚合器

项目背景

企业通常运行多个 Kubernetes 集群(开发、测试、生产、多区域等),需要统一查看和管理:

  • 🌍 多区域集群资源分布
  • 📊 跨集群资源使用统计
  • 🔍 全局资源搜索
  • 📈 集群健康状态对比

解决方案
聚合多个集群的资源信息,提供统一的查询和展示界面。

功能需求

核心功能

  • ✅ 多集群配置管理
  • ✅ 资源聚合查询(Pod、Service、Deployment 等)
  • ✅ 跨集群资源搜索
  • ✅ 集群健康状态聚合
  • ✅ 统一 Dashboard

高级功能

  • ✅ 跨集群资源迁移
  • ✅ 多集群负载均衡
  • ✅ 联邦资源管理
  • ✅ 成本对比分析

Go 完整实现

pkg/aggregator/aggregator.go

package aggregator

import (
    "context"
    "fmt"
    "sync"
    
    "github.com/sirupsen/logrus"
    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
    
    "multi-cluster-aggregator/pkg/config"
)

type ClusterAggregator struct {
    clusters map[string]*ClusterClient
    logger   *logrus.Logger
}

type ClusterClient struct {
    Name      string
    Region    string
    Env       string
    Clientset *kubernetes.Clientset
    Healthy   bool
}

type AggregatedPod struct {
    ClusterName string
    Namespace   string
    Name        string
    Status      string
    NodeName    string
    PodIP       string
    Labels      map[string]string
}

type AggregatedClusterInfo struct {
    ClusterName   string
    Region        string
    Env           string
    Healthy       bool
    NodeCount     int
    PodCount      int
    ServiceCount  int
    NamespaceCount int
    CPUCapacity   int64
    MemoryCapacity int64
    CPUUsage      int64
    MemoryUsage   int64
}

func NewClusterAggregator(cfg *config.Config, logger *logrus.Logger) *ClusterAggregator {
    return &ClusterAggregator{
        clusters: make(map[string]*ClusterClient),
        logger:   logger,
    }
}

func (ca *ClusterAggregator) AddCluster(name, kubeconfig, region, env string) error {
    config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
    if err != nil {
        return fmt.Errorf("failed to build config for cluster %s: %v", name, err)
    }
    
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        return fmt.Errorf("failed to create clientset for cluster %s: %v", name, err)
    }
    
    // 测试连接
    _, err = clientset.CoreV1().Namespaces().List(context.TODO(), metav1.ListOptions{Limit: 1})
    healthy := err == nil
    
    ca.clusters[name] = &ClusterClient{
        Name:      name,
        Region:    region,
        Env:       env,
        Clientset: clientset,
        Healthy:   healthy,
    }
    
    ca.logger.Infof("Added cluster: %s (region: %s, env: %s, healthy: %v)",
        name, region, env, healthy)
    
    return nil
}

func (ca *ClusterAggregator) GetAllPods() ([]AggregatedPod, error) {
    var allPods []AggregatedPod
    var mu sync.Mutex
    var wg sync.WaitGroup
    
    for _, cluster := range ca.clusters {
        if !cluster.Healthy {
            continue
        }
        
        wg.Add(1)
        go func(c *ClusterClient) {
            defer wg.Done()
            
            pods, err := c.Clientset.CoreV1().Pods("").List(
                context.TODO(),
                metav1.ListOptions{},
            )
            if err != nil {
                ca.logger.WithError(err).Errorf("Failed to list pods from cluster %s", c.Name)
                return
            }
            
            mu.Lock()
            defer mu.Unlock()
            
            for _, pod := range pods.Items {
                allPods = append(allPods, AggregatedPod{
                    ClusterName: c.Name,
                    Namespace:   pod.Namespace,
                    Name:        pod.Name,
                    Status:      string(pod.Status.Phase),
                    NodeName:    pod.Spec.NodeName,
                    PodIP:       pod.Status.PodIP,
                    Labels:      pod.Labels,
                })
            }
        }(cluster)
    }
    
    wg.Wait()
    
    ca.logger.Infof("Aggregated %d pods from %d clusters", len(allPods), len(ca.clusters))
    
    return allPods, nil
}

func (ca *ClusterAggregator) SearchPods(labelSelector string) ([]AggregatedPod, error) {
    var allPods []AggregatedPod
    var mu sync.Mutex
    var wg sync.WaitGroup
    
    for _, cluster := range ca.clusters {
        if !cluster.Healthy {
            continue
        }
        
        wg.Add(1)
        go func(c *ClusterClient) {
            defer wg.Done()
            
            pods, err := c.Clientset.CoreV1().Pods("").List(
                context.TODO(),
                metav1.ListOptions{
                    LabelSelector: labelSelector,
                },
            )
            if err != nil {
                ca.logger.WithError(err).Errorf("Failed to search pods in cluster %s", c.Name)
                return
            }
            
            mu.Lock()
            defer mu.Unlock()
            
            for _, pod := range pods.Items {
                allPods = append(allPods, AggregatedPod{
                    ClusterName: c.Name,
                    Namespace:   pod.Namespace,
                    Name:        pod.Name,
                    Status:      string(pod.Status.Phase),
                    NodeName:    pod.Spec.NodeName,
                    PodIP:       pod.Status.PodIP,
                    Labels:      pod.Labels,
                })
            }
        }(cluster)
    }
    
    wg.Wait()
    
    return allPods, nil
}

func (ca *ClusterAggregator) GetClusterInfo() ([]AggregatedClusterInfo, error) {
    var clusterInfos []AggregatedClusterInfo
    var mu sync.Mutex
    var wg sync.WaitGroup
    
    for _, cluster := range ca.clusters {
        wg.Add(1)
        go func(c *ClusterClient) {
            defer wg.Done()
            
            info := AggregatedClusterInfo{
                ClusterName: c.Name,
                Region:      c.Region,
                Env:         c.Env,
                Healthy:     c.Healthy,
            }
            
            if !c.Healthy {
                mu.Lock()
                clusterInfos = append(clusterInfos, info)
                mu.Unlock()
                return
            }
            
            // 获取节点信息
            nodes, err := c.Clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
            if err == nil {
                info.NodeCount = len(nodes.Items)
                
                for _, node := range nodes.Items {
                    if cpu, ok := node.Status.Capacity[corev1.ResourceCPU]; ok {
                        info.CPUCapacity += cpu.MilliValue()
                    }
                    if mem, ok := node.Status.Capacity[corev1.ResourceMemory]; ok {
                        info.MemoryCapacity += mem.Value()
                    }
                }
            }
            
            // 获取 Pod 信息
            pods, err := c.Clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{})
            if err == nil {
                info.PodCount = len(pods.Items)
            }
            
            // 获取 Service 信息
            services, err := c.Clientset.CoreV1().Services("").List(context.TODO(), metav1.ListOptions{})
            if err == nil {
                info.ServiceCount = len(services.Items)
            }
            
            // 获取 Namespace 信息
            namespaces, err := c.Clientset.CoreV1().Namespaces().List(context.TODO(), metav1.ListOptions{})
            if err == nil {
                info.NamespaceCount = len(namespaces.Items)
            }
            
            mu.Lock()
            clusterInfos = append(clusterInfos, info)
            mu.Unlock()
        }(cluster)
    }
    
    wg.Wait()
    
    return clusterInfos, nil
}

func (ca *ClusterAggregator) GetPodDistribution() map[string]map[string]int {
    distribution := make(map[string]map[string]int)
    
    for _, cluster := range ca.clusters {
        if !cluster.Healthy {
            continue
        }
        
        pods, err := cluster.Clientset.CoreV1().Pods("").List(
            context.TODO(),
            metav1.ListOptions{},
        )
        if err != nil {
            continue
        }
        
        if _, exists := distribution[cluster.Name]; !exists {
            distribution[cluster.Name] = make(map[string]int)
        }
        
        for _, pod := range pods.Items {
            distribution[cluster.Name][pod.Namespace]++
        }
    }
    
    return distribution
}

func (ca *ClusterAggregator) CompareResources() map[string]interface{} {
    clusterInfos, _ := ca.GetClusterInfo()
    
    comparison := make(map[string]interface{})
    
    for _, info := range clusterInfos {
        comparison[info.ClusterName] = map[string]interface{}{
            "region":          info.Region,
            "env":             info.Env,
            "healthy":         info.Healthy,
            "nodes":           info.NodeCount,
            "pods":            info.PodCount,
            "services":        info.ServiceCount,
            "cpu_capacity":    info.CPUCapacity,
            "memory_capacity": info.MemoryCapacity,
        }
    }
    
    return comparison
}

pkg/server/server.go

package server

import (
    "encoding/json"
    "fmt"
    "net/http"
    
    "github.com/gorilla/mux"
    "github.com/sirupsen/logrus"
    
    "multi-cluster-aggregator/pkg/aggregator"
)

type APIServer struct {
    aggregator *aggregator.ClusterAggregator
    logger     *logrus.Logger
}

func NewAPIServer(agg *aggregator.ClusterAggregator, logger *logrus.Logger) *APIServer {
    return &APIServer{
        aggregator: agg,
        logger:     logger,
    }
}

func (s *APIServer) Start(port int) error {
    r := mux.NewRouter()
    
    // API 路由
    r.HandleFunc("/api/v1/pods", s.handleGetAllPods).Methods("GET")
    r.HandleFunc("/api/v1/pods/search", s.handleSearchPods).Methods("GET")
    r.HandleFunc("/api/v1/clusters", s.handleGetClusters).Methods("GET")
    r.HandleFunc("/api/v1/distribution", s.handleGetDistribution).Methods("GET")
    r.HandleFunc("/api/v1/comparison", s.handleGetComparison).Methods("GET")
    
    // 静态文件
    r.PathPrefix("/").Handler(http.FileServer(http.Dir("./static")))
    
    addr := fmt.Sprintf(":%d", port)
    s.logger.Infof("Starting API server on %s", addr)
    
    return http.ListenAndServe(addr, r)
}

func (s *APIServer) handleGetAllPods(w http.ResponseWriter, r *http.Request) {
    pods, err := s.aggregator.GetAllPods()
    if err != nil {
        s.sendError(w, err, http.StatusInternalServerError)
        return
    }
    
    s.sendJSON(w, pods)
}

func (s *APIServer) handleSearchPods(w http.ResponseWriter, r *http.Request) {
    labelSelector := r.URL.Query().Get("labels")
    
    pods, err := s.aggregator.SearchPods(labelSelector)
    if err != nil {
        s.sendError(w, err, http.StatusInternalServerError)
        return
    }
    
    s.sendJSON(w, pods)
}

func (s *APIServer) handleGetClusters(w http.ResponseWriter, r *http.Request) {
    clusterInfos, err := s.aggregator.GetClusterInfo()
    if err != nil {
        s.sendError(w, err, http.StatusInternalServerError)
        return
    }
    
    s.sendJSON(w, clusterInfos)
}

func (s *APIServer) handleGetDistribution(w http.ResponseWriter, r *http.Request) {
    distribution := s.aggregator.GetPodDistribution()
    s.sendJSON(w, distribution)
}

func (s *APIServer) handleGetComparison(w http.ResponseWriter, r *http.Request) {
    comparison := s.aggregator.CompareResources()
    s.sendJSON(w, comparison)
}

func (s *APIServer) sendJSON(w http.ResponseWriter, data interface{}) {
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(data)
}

func (s *APIServer) sendError(w http.ResponseWriter, err error, status int) {
    w.Header().Set("Content-Type", "application/json")
    w.WriteHeader(status)
    json.NewEncoder(w).Encode(map[string]string{
        "error": err.Error(),
    })
}

Python 实现

multi_cluster_aggregator.py

#!/usr/bin/env python3

import concurrent.futures
from typing import List, Dict
from dataclasses import dataclass
from kubernetes import client, config
from flask import Flask, jsonify, request

@dataclass
class ClusterConfig:
    name: str
    kubeconfig: str
    region: str
    env: str

@dataclass
class AggregatedPod:
    cluster_name: str
    namespace: str
    name: str
    status: str
    node_name: str
    pod_ip: str
    labels: Dict[str, str]

class MultiClusterAggregator:
    def __init__(self):
        self.clusters: Dict[str, client.CoreV1Api] = {}
        self.cluster_info: Dict[str, ClusterConfig] = {}
    
    def add_cluster(self, cluster_config: ClusterConfig):
        """添加集群"""
        try:
            # 加载指定的 kubeconfig
            k8s_config = config.load_kube_config(
                config_file=cluster_config.kubeconfig
            )
            
            api_client = client.ApiClient(k8s_config)
            v1 = client.CoreV1Api(api_client)
            
            # 测试连接
            v1.list_namespace(limit=1)
            
            self.clusters[cluster_config.name] = v1
            self.cluster_info[cluster_config.name] = cluster_config
            
            print(f"Added cluster: {cluster_config.name} "
                  f"(region: {cluster_config.region}, env: {cluster_config.env})")
        
        except Exception as e:
            print(f"Failed to add cluster {cluster_config.name}: {e}")
    
    def get_all_pods(self) -> List[AggregatedPod]:
        """获取所有集群的 Pods"""
        all_pods = []
        
        with concurrent.futures.ThreadPoolExecutor() as executor:
            futures = {
                executor.submit(self._get_cluster_pods, name, v1): name
                for name, v1 in self.clusters.items()
            }
            
            for future in concurrent.futures.as_completed(futures):
                cluster_name = futures[future]
                try:
                    pods = future.result()
                    all_pods.extend(pods)
                except Exception as e:
                    print(f"Failed to get pods from {cluster_name}: {e}")
        
        print(f"Aggregated {len(all_pods)} pods from {len(self.clusters)} clusters")
        
        return all_pods
    
    def _get_cluster_pods(self, cluster_name: str, v1: client.CoreV1Api) -> List[AggregatedPod]:
        """获取单个集群的 Pods"""
        pods = []
        
        try:
            pod_list = v1.list_pod_for_all_namespaces()
            
            for pod in pod_list.items:
                pods.append(AggregatedPod(
                    cluster_name=cluster_name,
                    namespace=pod.metadata.namespace,
                    name=pod.metadata.name,
                    status=pod.status.phase,
                    node_name=pod.spec.node_name or "",
                    pod_ip=pod.status.pod_ip or "",
                    labels=pod.metadata.labels or {}
                ))
        
        except Exception as e:
            print(f"Error getting pods from {cluster_name}: {e}")
        
        return pods
    
    def search_pods(self, label_selector: str) -> List[AggregatedPod]:
        """跨集群搜索 Pods"""
        all_pods = []
        
        with concurrent.futures.ThreadPoolExecutor() as executor:
            futures = {
                executor.submit(
                    self._search_cluster_pods,
                    name, v1, label_selector
                ): name
                for name, v1 in self.clusters.items()
            }
            
            for future in concurrent.futures.as_completed(futures):
                try:
                    pods = future.result()
                    all_pods.extend(pods)
                except Exception as e:
                    print(f"Search error: {e}")
        
        return all_pods
    
    def _search_cluster_pods(
        self,
        cluster_name: str,
        v1: client.CoreV1Api,
        label_selector: str
    ) -> List[AggregatedPod]:
        """在单个集群中搜索 Pods"""
        pods = []
        
        try:
            pod_list = v1.list_pod_for_all_namespaces(
                label_selector=label_selector
            )
            
            for pod in pod_list.items:
                pods.append(AggregatedPod(
                    cluster_name=cluster_name,
                    namespace=pod.metadata.namespace,
                    name=pod.metadata.name,
                    status=pod.status.phase,
                    node_name=pod.spec.node_name or "",
                    pod_ip=pod.status.pod_ip or "",
                    labels=pod.metadata.labels or {}
                ))
        
        except Exception as e:
            print(f"Search error in {cluster_name}: {e}")
        
        return pods
    
    def get_cluster_info(self) -> List[Dict]:
        """获取所有集群信息"""
        cluster_infos = []
        
        with concurrent.futures.ThreadPoolExecutor() as executor:
            futures = {
                executor.submit(self._get_single_cluster_info, name, v1): name
                for name, v1 in self.clusters.items()
            }
            
            for future in concurrent.futures.as_completed(futures):
                try:
                    info = future.result()
                    cluster_infos.append(info)
                except Exception as e:
                    print(f"Error getting cluster info: {e}")
        
        return cluster_infos
    
    def _get_single_cluster_info(
        self,
        cluster_name: str,
        v1: client.CoreV1Api
    ) -> Dict:
        """获取单个集群信息"""
        cfg = self.cluster_info[cluster_name]
        
        info = {
            'cluster_name': cluster_name,
            'region': cfg.region,
            'env': cfg.env,
            'healthy': True,
            'node_count': 0,
            'pod_count': 0,
            'service_count': 0,
            'namespace_count': 0,
            'cpu_capacity': 0,
            'memory_capacity': 0
        }
        
        try:
            # 节点信息
            nodes = v1.list_node()
            info['node_count'] = len(nodes.items)
            
            for node in nodes.items:
                if node.status.capacity:
                    if 'cpu' in node.status.capacity:
                        cpu_str = node.status.capacity['cpu']
                        # 简化:假设是整数 cores
                        info['cpu_capacity'] += int(cpu_str) * 1000
                    if 'memory' in node.status.capacity:
                        mem_str = node.status.capacity['memory']
                        # 解析内存
                        info['memory_capacity'] += self._parse_memory(mem_str)
            
            # Pod 信息
            pods = v1.list_pod_for_all_namespaces()
            info['pod_count'] = len(pods.items)
            
            # Service 信息
            services = v1.list_service_for_all_namespaces()
            info['service_count'] = len(services.items)
            
            # Namespace 信息
            namespaces = v1.list_namespace()
            info['namespace_count'] = len(namespaces.items)
        
        except Exception as e:
            print(f"Error getting info for {cluster_name}: {e}")
            info['healthy'] = False
        
        return info
    
    def _parse_memory(self, mem_str: str) -> int:
        """解析内存字符串"""
        units = {'Ki': 1024, 'Mi': 1024**2, 'Gi': 1024**3}
        
        for unit, multiplier in units.items():
            if mem_str.endswith(unit):
                return int(mem_str[:-len(unit)]) * multiplier
        
        return int(mem_str)
    
    def get_pod_distribution(self) -> Dict[str, Dict[str, int]]:
        """获取 Pod 分布"""
        distribution = {}
        
        for cluster_name, v1 in self.clusters.items():
            try:
                pods = v1.list_pod_for_all_namespaces()
                
                distribution[cluster_name] = {}
                
                for pod in pods.items:
                    ns = pod.metadata.namespace
                    if ns not in distribution[cluster_name]:
                        distribution[cluster_name][ns] = 0
                    distribution[cluster_name][ns] += 1
            
            except Exception as e:
                print(f"Error getting distribution for {cluster_name}: {e}")
        
        return distribution

# Flask API Server
app = Flask(__name__)
aggregator = MultiClusterAggregator()

@app.route('/api/v1/pods')
def get_all_pods():
    pods = aggregator.get_all_pods()
    return jsonify([{
        'cluster_name': p.cluster_name,
        'namespace': p.namespace,
        'name': p.name,
        'status': p.status,
        'node_name': p.node_name,
        'pod_ip': p.pod_ip,
        'labels': p.labels
    } for p in pods])

@app.route('/api/v1/pods/search')
def search_pods():
    labels = request.args.get('labels', '')
    pods = aggregator.search_pods(labels)
    return jsonify([{
        'cluster_name': p.cluster_name,
        'namespace': p.namespace,
        'name': p.name,
        'status': p.status
    } for p in pods])

@app.route('/api/v1/clusters')
def get_clusters():
    return jsonify(aggregator.get_cluster_info())

@app.route('/api/v1/distribution')
def get_distribution():
    return jsonify(aggregator.get_pod_distribution())

if __name__ == '__main__':
    # 添加集群
    aggregator.add_cluster(ClusterConfig(
        name='prod-us-east',
        kubeconfig='/path/to/prod-us-east.kubeconfig',
        region='us-east-1',
        env='production'
    ))
    
    aggregator.add_cluster(ClusterConfig(
        name='prod-eu-west',
        kubeconfig='/path/to/prod-eu-west.kubeconfig',
        region='eu-west-1',
        env='production'
    ))
    
    # 启动 API Server
    app.run(host='0.0.0.0', port=8080, debug=True)

配置示例

clusters.yaml

clusters:
- name: prod-us-east
  kubeconfig: /etc/kubeconfigs/prod-us-east.yaml
  region: us-east-1
  env: production
  
- name: prod-eu-west
  kubeconfig: /etc/kubeconfigs/prod-eu-west.yaml
  region: eu-west-1
  env: production
  
- name: dev-us-east
  kubeconfig: /etc/kubeconfigs/dev-us-east.yaml
  region: us-east-1
  env: development

API 使用示例

获取所有 Pods

curl http://localhost:8080/api/v1/pods

跨集群搜索

curl "http://localhost:8080/api/v1/pods/search?labels=app=nginx"

获取集群信息

curl http://localhost:8080/api/v1/clusters

获取 Pod 分布

curl http://localhost:8080/api/v1/distribution

Dashboard 示例

static/index.html

<!DOCTYPE html>
<html>
<head>
    <title>Multi-Cluster Dashboard</title>
    <script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
    <style>
        body { font-family: Arial, sans-serif; margin: 20px; }
        .cluster-card {
            border: 1px solid #ddd;
            padding: 15px;
            margin: 10px 0;
            border-radius: 5px;
        }
        .healthy { border-left: 5px solid #4CAF50; }
        .unhealthy { border-left: 5px solid #f44336; }
        table { border-collapse: collapse; width: 100%; }
        th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
        th { background-color: #326CE5; color: white; }
    </style>
</head>
<body>
    <h1>Multi-Cluster Dashboard</h1>
    
    <h2>Cluster Status</h2>
    <div id="clusters"></div>
    
    <h2>Pod Distribution</h2>
    <canvas id="distributionChart"></canvas>
    
    <h2>All Pods</h2>
    <input type="text" id="searchInput" placeholder="Search by labels (e.g., app=nginx)">
    <button onclick="searchPods()">Search</button>
    <table id="podsTable">
        <thead>
            <tr>
                <th>Cluster</th>
                <th>Namespace</th>
                <th>Pod Name</th>
                <th>Status</th>
                <th>Node</th>
                <th>IP</th>
            </tr>
        </thead>
        <tbody></tbody>
    </table>
    
    <script>
        async function loadClusters() {
            const response = await fetch('/api/v1/clusters');
            const clusters = await response.json();
            
            const container = document.getElementById('clusters');
            container.innerHTML = '';
            
            clusters.forEach(cluster => {
                const card = document.createElement('div');
                card.className = `cluster-card ${cluster.healthy ? 'healthy' : 'unhealthy'}`;
                card.innerHTML = `
                    <h3>${cluster.cluster_name}</h3>
                    <p>Region: ${cluster.region} | Env: ${cluster.env}</p>
                    <p>Nodes: ${cluster.node_count} | Pods: ${cluster.pod_count} | 
                       Services: ${cluster.service_count}</p>
                    <p>CPU: ${(cluster.cpu_capacity / 1000).toFixed(2)} cores | 
                       Memory: ${(cluster.memory_capacity / (1024**3)).toFixed(2)} GB</p>
                `;
                container.appendChild(card);
            });
        }
        
        async function loadPods() {
            const response = await fetch('/api/v1/pods');
            const pods = await response.json();
            
            const tbody = document.querySelector('#podsTable tbody');
            tbody.innerHTML = '';
            
            pods.forEach(pod => {
                const row = tbody.insertRow();
                row.innerHTML = `
                    <td>${pod.cluster_name}</td>
                    <td>${pod.namespace}</td>
                    <td>${pod.name}</td>
                    <td>${pod.status}</td>
                    <td>${pod.node_name}</td>
                    <td>${pod.pod_ip}</td>
                `;
            });
        }
        
        async function searchPods() {
            const labels = document.getElementById('searchInput').value;
            const url = labels ? `/api/v1/pods/search?labels=${labels}` : '/api/v1/pods';
            
            const response = await fetch(url);
            const pods = await response.json();
            
            const tbody = document.querySelector('#podsTable tbody');
            tbody.innerHTML = '';
            
            pods.forEach(pod => {
                const row = tbody.insertRow();
                row.innerHTML = `
                    <td>${pod.cluster_name}</td>
                    <td>${pod.namespace}</td>
                    <td>${pod.name}</td>
                    <td>${pod.status}</td>
                `;
            });
        }
        
        // 初始加载
        loadClusters();
        loadPods();
        
        // 定期刷新
        setInterval(loadClusters, 60000);
    </script>
</body>
</html>

部署

Kubernetes Deployment

apiVersion: v1
kind: ConfigMap
metadata:
  name: cluster-configs
  namespace: kube-system
data:
  clusters.yaml: |
    # 集群配置
---
apiVersion: v1
kind: Secret
metadata:
  name: kubeconfigs
  namespace: kube-system
type: Opaque
data:
  prod-us-east.yaml: <base64-encoded-kubeconfig>
  prod-eu-west.yaml: <base64-encoded-kubeconfig>
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: multi-cluster-aggregator
  namespace: kube-system
spec:
  replicas: 1
  selector:
    matchLabels:
      app: multi-cluster-aggregator
  template:
    metadata:
      labels:
        app: multi-cluster-aggregator
    spec:
      containers:
      - name: aggregator
        image: your-registry/multi-cluster-aggregator:latest
        ports:
        - containerPort: 8080
        volumeMounts:
        - name: kubeconfigs
          mountPath: /etc/kubeconfigs
        - name: config
          mountPath: /etc/config
      volumes:
      - name: kubeconfigs
        secret:
          secretName: kubeconfigs
      - name: config
        configMap:
          name: cluster-configs
---
apiVersion: v1
kind: Service
metadata:
  name: multi-cluster-aggregator
  namespace: kube-system
spec:
  selector:
    app: multi-cluster-aggregator
  ports:
  - port: 80
    targetPort: 8080
  type: LoadBalancer

总结

功能特性

多集群管理: 统一管理多个集群
并发查询: 高效的资源聚合
跨集群搜索: 全局资源查找
统一 Dashboard: 可视化展示
RESTful API: 易于集成

应用场景

  1. 多区域部署: 统一查看全球集群
  2. 多环境管理: 开发、测试、生产
  3. 成本分析: 跨集群资源对比
  4. 迁移规划: 资源分布分析

扩展方向

  1. 联邦资源管理: 跨集群资源创建和管理
  2. 智能调度: 跨集群负载均衡
  3. 成本优化: 资源使用优化建议
  4. 灾备切换: 跨集群故障转移

下一个项目将介绍 GitOps 自动部署工具。