
数据库选型决策框架PostgreSQL vs MongoDB vs ClickHouse的场景分析与成本收益对比创业团队技术选型中最容易翻车的环节不在代码层面而在数据库决策。一张错误的选型单前期加班三个月后期还债三年。本文抛开品牌偏好从场景适配、性能基准、成本模型三个维度建立一套可复用的数据库选型决策框架。一、三个数据库的核心定位讨论选型前先对齐三者的本质差异。PostgreSQL 是关系型数据库的工程标杆。ACID 事务、多表 JOIN、窗口函数、JSONB 索引、全文搜索、地理空间扩展组合在一起覆盖绝大多数 OLTP 场景。它的代价是复杂查询下的优化难度和水平扩展门槛。MongoDB 是文档数据库的事实标准。Schema-less 使其天然适配快迭代的业务聚合管道和变更流让它在日志分析和实时推送场景也有用武之地。代价是事务支持晚于 PG多文档事务性能不及关系型查询模式声明性较弱。ClickHouse 是列式分析引擎的天花板。十亿行级别的扫描秒级返回物化视图、工程函数和压缩比让它在实时数仓场景几乎没有对手。代价是不支持事务不适合频繁 UPDATE/DELETE连接语义与传统关系型完全不同。概括一句话PG 做业务系统Mongo 做内容系统ClickHouse 做分析系统。混淆错配就是成本的源头。二、场景决策树什么场景用什么库选型不能凭直觉需要一个结构化判断流程。以下决策树覆盖创业团队最常见的五种写入与查询模式。flowchart TD A[数据写入与查询模式] -- B{需要ACID事务} B --|是| C{数据结构稳定} C --|是| PG[PostgreSQL] C --|否| M[PostgreSQL JSONB] B --|否| D{写入模式} D --|高频追加/批量| E{查询模式} E --|聚合分析为主| CK[ClickHouse] E --|多样化灵活查询| MO[MongoDB] D --|低延迟单条读写| F{Schema固定} F --|是| PG F --|否| MO判断的核心变量有三个事务需求、Schema 稳定性、查询模式。按这个顺序排查大部分选型争论可以收敛。还有一个常见陷阱把 ClickHouse 当 OLTP 用。如果业务有大量 UPDATE 或 DELETE 操作ClickHouse 的合并树机制会让写入放大严重。反过来在 PG 上做十亿级大表聚合查询抛开分区和物化视图硬跑也是自找麻烦。三、性能基准用数据说话光看文档和博客不够需要用基准测试验证假设。以下代码基于 Python 实现覆盖插入、范围查询、聚合分析三个核心场景包含完整的错误处理和结果输出。 数据库性能基准测试框架 测试对象PostgreSQL、MongoDB、ClickHouse 测试场景批量写入、范围查询、聚合分析 import time import json import random import uuid from abc import ABC, abstractmethod from contextlib import contextmanager from dataclasses import dataclass, field from datetime import datetime, timedelta from functools import wraps from typing import Any, Callable, Generator, List, Optional import psycopg2 import psycopg2.errors import pymongo import pymongo.errors from pymongo import IndexModel, ASCENDING from clickhouse_driver import Client as CHClient from clickhouse_driver.errors import Error as CHError dataclass class BenchmarkResult: 单次基准测试结果 operation: str row_count: int elapsed_ms: float throughput: float # ops/sec success: bool error_message: Optional[str] None dataclass class BenchmarkReport: 完整基准测试报告 db_type: str results: List[BenchmarkResult] field(default_factorylist) def add(self, result: BenchmarkResult) - None: self.results.append(result) def summary(self) - str: lines [f\n{*60}, f {self.db_type} 基准测试报告, f{*60}] for r in self.results: status PASS if r.success else FAIL lines.append( f {r.operation:20s} | f行数: {r.row_count:8d} | f耗时: {r.elapsed_ms:8.1f}ms | f吞吐: {r.throughput:10.1f} ops/s | f状态: {status} ) if r.error_message: lines.append(f 错误: {r.error_message}) lines.append(f{*60}\n) return \n.join(lines) def retry_on_error( max_retries: int 3, delay_sec: float 1.0, backoff: float 2.0, ): 数据库操作重试装饰器 def decorator(func: Callable) - Callable: wraps(func) def wrapper(*args: Any, **kwargs: Any) - Any: last_error: Optional[Exception] None current_delay delay_sec for attempt in range(max_retries 1): try: return func(*args, **kwargs) except ( psycopg2.OperationalError, psycopg2.errors.SerializationFailure, pymongo.errors.ConnectionFailure, pymongo.errors.ServerSelectionTimeoutError, CHError, ConnectionError, TimeoutError, ) as e: last_error e if attempt max_retries: time.sleep(current_delay) current_delay * backoff raise RuntimeError( f操作失败已重试 {max_retries} 次: {last_error} ) return wrapper return decorator def measure_time(func: Callable) - Callable: 计时装饰器 wraps(func) def wrapper(*args: Any, **kwargs: Any) - tuple: start time.perf_counter() result func(*args, **kwargs) elapsed (time.perf_counter() - start) * 1000 return result, elapsed return wrapper class BaseBenchmark(ABC): 基准测试基类 def __init__(self, conn_config: dict): self.conn_config conn_config self.db_type self.__class__.__name__.replace(Benchmark, ) self.report BenchmarkReport(db_typeself.db_type) self.row_count 100_000 abstractmethod def connect(self) - Any: 建立数据库连接 pass abstractmethod def prepare_table(self) - None: 准备测试表/集合 pass abstractmethod def cleanup(self) - None: 清理测试数据 pass def generate_rows(self, count: int) - List[dict]: 生成测试数据 base_time datetime.now() return [ { event_id: str(uuid.uuid4()), user_id: random.randint(1, 10_000), event_type: random.choice([click, view, purchase, share]), value: round(random.uniform(0.01, 999.99), 2), created_at: ( base_time timedelta(secondsrandom.randint(0, 86400 * 30)) ), } for _ in range(count) ] def run(self) - BenchmarkReport: 执行完整基准测试流程 try: self.prepare_table() except Exception as e: self.report.add(BenchmarkResult( operationprepare, row_count0, elapsed_ms0, throughput0, successFalse, error_messagef准备表失败: {e}, )) return self.report for name, method in [ (批量写入, benchmark_insert), (范围查询, benchmark_range_query), (聚合分析, benchmark_aggregate), (点查询, benchmark_point_query), ]: try: func getattr(self, method) r func() self.report.add(r) except Exception as e: self.report.add(BenchmarkResult( operationname, row_count0, elapsed_ms0, throughput0, successFalse, error_messagef{type(e).__name__}: {e}, )) try: self.cleanup() except Exception: pass return self.report abstractmethod def benchmark_insert(self) - BenchmarkResult: pass abstractmethod def benchmark_range_query(self) - BenchmarkResult: pass abstractmethod def benchmark_aggregate(self) - BenchmarkResult: pass abstractmethod def benchmark_point_query(self) - BenchmarkResult: pass class PostgreSQLBenchmark(BaseBenchmark): PostgreSQL 基准测试实现 def __init__(self, conn_config: dict): super().__init__(conn_config) self.conn None self.table_name bench_events retry_on_error(max_retries3) def connect(self) - Any: self.conn psycopg2.connect( hostself.conn_config.get(host, localhost), portself.conn_config.get(port, 5432), dbnameself.conn_config.get(database, benchmark), userself.conn_config.get(user, postgres), passwordself.conn_config.get(password, ), connect_timeout10, ) self.conn.autocommit False return self.conn def prepare_table(self) - None: self.connect() cur self.conn.cursor() try: cur.execute(fDROP TABLE IF EXISTS {self.table_name}) cur.execute(f CREATE TABLE {self.table_name} ( event_id UUID PRIMARY KEY, user_id INTEGER NOT NULL, event_type VARCHAR(20) NOT NULL, value DECIMAL(10,2) NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT NOW() ) ) cur.execute(f CREATE INDEX idx_bench_user_id ON {self.table_name} (user_id) ) cur.execute(f CREATE INDEX idx_bench_created_at ON {self.table_name} (created_at) ) cur.execute(f CREATE INDEX idx_bench_event_type ON {self.table_name} (event_type) ) self.conn.commit() except Exception: self.conn.rollback() raise finally: cur.close() def cleanup(self) - None: if self.conn: try: cur self.conn.cursor() cur.execute(fDROP TABLE IF EXISTS {self.table_name}) self.conn.commit() cur.close() except Exception: self.conn.rollback() finally: self.conn.close() def benchmark_insert(self) - BenchmarkResult: rows self.generate_rows(self.row_count) insert_sql f INSERT INTO {self.table_name} (event_id, user_id, event_type, value, created_at) VALUES (%s, %s, %s, %s, %s) ON CONFLICT (event_id) DO NOTHING cur self.conn.cursor() try: batch [ (r[event_id], r[user_id], r[event_type], r[value], r[created_at]) for r in rows ] start time.perf_counter() cur.executemany(insert_sql, batch) self.conn.commit() elapsed (time.perf_counter() - start) * 1000 return BenchmarkResult( operation批量写入, row_countself.row_count, elapsed_msround(elapsed, 2), throughputround(self.row_count / (elapsed / 1000), 1), successTrue, ) except Exception as e: self.conn.rollback() return BenchmarkResult( operation批量写入, row_count0, elapsed_ms0, throughput0, successFalse, error_messagestr(e), ) finally: cur.close() def benchmark_range_query(self) - BenchmarkResult: cur self.conn.cursor() try: start time.perf_counter() cur.execute(f SELECT user_id, SUM(value) AS total_value, COUNT(*) AS cnt FROM {self.table_name} WHERE created_at NOW() - INTERVAL 7 days GROUP BY user_id HAVING COUNT(*) 10 ORDER BY total_value DESC LIMIT 1000 ) rows_fetched cur.rowcount if cur.rowcount 0 else 0 elapsed (time.perf_counter() - start) * 1000 return BenchmarkResult( operation范围查询, row_countrows_fetched, elapsed_msround(elapsed, 2), throughputround(rows_fetched / (elapsed / 1000), 1) if elapsed 0 else 0, successTrue, ) except Exception as e: self.conn.rollback() return BenchmarkResult( operation范围查询, row_count0, elapsed_ms0, throughput0, successFalse, error_messagestr(e), ) finally: cur.close() def benchmark_aggregate(self) - BenchmarkResult: cur self.conn.cursor() try: start time.perf_counter() cur.execute(f SELECT event_type, DATE(created_at) AS day, COUNT(*) AS event_count, AVG(value) AS avg_value, PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY value) AS p95 FROM {self.table_name} GROUP BY event_type, DATE(created_at) ORDER BY day DESC, event_type LIMIT 500 ) rows_fetched cur.rowcount if cur.rowcount 0 else 0 elapsed (time.perf_counter() - start) * 1000 return BenchmarkResult( operation聚合分析, row_countrows_fetched, elapsed_msround(elapsed, 2), throughputround(rows_fetched / (elapsed / 1000), 1) if elapsed 0 else 0, successTrue, ) except Exception as e: self.conn.rollback() return BenchmarkResult( operation聚合分析, row_count0, elapsed_ms0, throughput0, successFalse, error_messagestr(e), ) finally: cur.close() def benchmark_point_query(self) - BenchmarkResult: cur self.conn.cursor() try: sample_id random.randint(1, self.row_count) start time.perf_counter() cur.execute(f SELECT * FROM {self.table_name} WHERE user_id %s ORDER BY created_at DESC LIMIT 10 , (sample_id,)) _ cur.fetchall() elapsed (time.perf_counter() - start) * 1000 return BenchmarkResult( operation点查询, row_count10, elapsed_msround(elapsed, 2), throughputround(10 / (elapsed / 1000), 1) if elapsed 0 else 0, successTrue, ) except Exception as e: self.conn.rollback() return BenchmarkResult( operation点查询, row_count0, elapsed_ms0, throughput0, successFalse, error_messagestr(e), ) finally: cur.close() class MongoDBBenchmark(BaseBenchmark): MongoDB 基准测试实现 def __init__(self, conn_config: dict): super().__init__(conn_config) self.client None self.db None self.collection None self.collection_name bench_events retry_on_error(max_retries3) def connect(self) - Any: uri self.conn_config.get( uri, fmongodb://{self.conn_config.get(host, localhost)}: f{self.conn_config.get(port, 27017)}/, ) self.client pymongo.MongoClient( uri, serverSelectionTimeoutMS10000, connectTimeoutMS10000, ) self.db self.client[self.conn_config.get(database, benchmark)] self.collection self.db[self.collection_name] return self.client def prepare_table(self) - None: self.connect() self.collection.drop() self.collection.create_indexes([ IndexModel([(user_id, ASCENDING)], nameidx_user_id), IndexModel([(created_at, ASCENDING)], nameidx_created_at), IndexModel([(event_type, ASCENDING)], nameidx_event_type), ]) def cleanup(self) - None: if self.client: try: self.collection.drop() except Exception: pass self.client.close() def benchmark_insert(self) - BenchmarkResult: rows self.generate_rows(self.row_count) docs [ { event_id: r[event_id], user_id: r[user_id], event_type: r[event_type], value: r[value], created_at: r[created_at], } for r in rows ] try: start time.perf_counter() result self.collection.insert_many(docs, orderedFalse) elapsed (time.perf_counter() - start) * 1000 inserted len(result.inserted_ids) return BenchmarkResult( operation批量写入, row_countinserted, elapsed_msround(elapsed, 2), throughputround(inserted / (elapsed / 1000), 1), successTrue, ) except pymongo.errors.BulkWriteError as e: inserted e.details.get(nInserted, 0) return BenchmarkResult( operation批量写入, row_countinserted, elapsed_ms0, throughput0, successFalse, error_messagef部分写入失败: {len(e.details.get(writeErrors, []))} 条, ) except Exception as e: return BenchmarkResult( operation批量写入, row_count0, elapsed_ms0, throughput0, successFalse, error_messagestr(e), ) def benchmark_range_query(self) - BenchmarkResult: try: cutoff datetime.now() - timedelta(days7) pipeline [ {$match: {created_at: {$gte: cutoff}}}, { $group: { _id: $user_id, total_value: {$sum: $value}, cnt: {$sum: 1}, } }, {$match: {cnt: {$gt: 10}}}, {$sort: {total_value: -1}}, {$limit: 1000}, ] start time.perf_counter() results list(self.collection.aggregate(pipeline, allowDiskUseTrue)) elapsed (time.perf_counter() - start) * 1000 return BenchmarkResult( operation范围查询, row_countlen(results), elapsed_msround(elapsed, 2), throughputround(len(results) / (elapsed / 1000), 1) if elapsed 0 else 0, successTrue, ) except Exception as e: return BenchmarkResult( operation范围查询, row_count0, elapsed_ms0, throughput0, successFalse, error_messagestr(e), ) def benchmark_aggregate(self) - BenchmarkResult: try: pipeline [ { $group: { _id: { event_type: $event_type, day: {$dateToString: { format: %Y-%m-%d, date: $created_at, }}, }, event_count: {$sum: 1}, avg_value: {$avg: $value}, } }, {$sort: {_id.day: -1, _id.event_type: 1}}, {$limit: 500}, ] start time.perf_counter() results list(self.collection.aggregate(pipeline, allowDiskUseTrue)) elapsed (time.perf_counter() - start) * 1000 return BenchmarkResult( operation聚合分析, row_countlen(results), elapsed_msround(elapsed, 2), throughputround(len(results) / (elapsed / 1000), 1) if elapsed 0 else 0, successTrue, ) except Exception as e: return BenchmarkResult( operation聚合分析, row_count0, elapsed_ms0, throughput0, successFalse, error_messagestr(e), ) def benchmark_point_query(self) - BenchmarkResult: try: sample_id random.randint(1, self.row_count) start time.perf_counter() results list( self.collection.find({user_id: sample_id}) .sort(created_at, -1) .limit(10) ) elapsed (time.perf_counter() - start) * 1000 return BenchmarkResult( operation点查询, row_countlen(results), elapsed_msround(elapsed, 2), throughputround(len(results) / (elapsed / 1000), 1) if elapsed 0 else 0, successTrue, ) except Exception as e: return BenchmarkResult( operation点查询, row_count0, elapsed_ms0, throughput0, successFalse, error_messagestr(e), ) class ClickHouseBenchmark(BaseBenchmark): ClickHouse 基准测试实现 def __init__(self, conn_config: dict): super().__init__(conn_config) self.client None self.table_name bench_events retry_on_error(max_retries3) def connect(self) - Any: self.client CHClient( hostself.conn_config.get(host, localhost), portself.conn_config.get(port, 9000), databaseself.conn_config.get(database, default), userself.conn_config.get(user, default), passwordself.conn_config.get(password, ), connect_timeout10, send_receive_timeout30, ) return self.client def prepare_table(self) - None: self.connect() self.client.execute(fDROP TABLE IF EXISTS {self.table_name} SYNC) self.client.execute(f CREATE TABLE {self.table_name} ( event_id String, user_id UInt32, event_type LowCardinality(String), value Decimal(10, 2), created_at DateTime ) ENGINE MergeTree() ORDER BY (event_type, created_at) PARTITION BY toYYYYMM(created_at) SETTINGS index_granularity 8192 ) def cleanup(self) - None: if self.client: try: self.client.execute( fDROP TABLE IF EXISTS {self.table_name} SYNC ) except Exception: pass self.client.disconnect() def benchmark_insert(self) - BenchmarkResult: rows self.generate_rows(self.row_count) data [ ( r[event_id], r[user_id], r[event_type], r[value], r[created_at], ) for r in rows ] try: start time.perf_counter() self.client.execute( fINSERT INTO {self.table_name} VALUES, data, ) elapsed (time.perf_counter() - start) * 1000 return BenchmarkResult( operation批量写入, row_countlen(data), elapsed_msround(elapsed, 2), throughputround(len(data) / (elapsed / 1000), 1), successTrue, ) except Exception as e: return BenchmarkResult( operation批量写入, row_count0, elapsed_ms0, throughput0, successFalse, error_messagestr(e), ) def benchmark_range_query(self) - BenchmarkResult: try: start time.perf_counter() result self.client.execute(f SELECT user_id, sum(value) AS total_value, count() AS cnt FROM {self.table_name} WHERE created_at now() - INTERVAL 7 DAY GROUP BY user_id HAVING cnt 10 ORDER BY total_value DESC LIMIT 1000 ) elapsed (time.perf_counter() - start) * 1000 return BenchmarkResult( operation范围查询, row_countlen(result), elapsed_msround(elapsed, 2), throughputround(len(result) / (elapsed / 1000), 1) if elapsed 0 else 0, successTrue, ) except Exception as e: return BenchmarkResult( operation范围查询, row_count0, elapsed_ms0, throughput0, successFalse, error_messagestr(e), ) def benchmark_aggregate(self) - BenchmarkResult: try: start time.perf_counter() result self.client.execute(f SELECT event_type, toDate(created_at) AS day, count() AS event_count, avg(value) AS avg_value, quantile(0.95)(value) AS p95 FROM {self.table_name} GROUP BY event_type, day ORDER BY day DESC, event_type LIMIT 500 ) elapsed (time.perf_counter() - start) * 1000 return BenchmarkResult( operation聚合分析, row_countlen(result), elapsed_msround(elapsed, 2), throughputround(len(result) / (elapsed / 1000), 1) if elapsed 0 else 0, successTrue, ) except Exception as e: return BenchmarkResult( operation聚合分析, row_count0, elapsed_ms0, throughput0, successFalse, error_messagestr(e), ) def benchmark_point_query(self) - BenchmarkResult: try: sample_id random.randint(1, self.row_count) start time.perf_counter() result self.client.execute(f SELECT * FROM {self.table_name} WHERE user_id {sample_id} ORDER BY created_at DESC LIMIT 10 ) elapsed (time.perf_counter() - start) * 1000 return BenchmarkResult( operation点查询, row_countlen(result), elapsed_msround(elapsed, 2), throughputround(len(result) / (elapsed / 1000), 1) if elapsed 0 else 0, successTrue, ) except Exception as e: return BenchmarkResult( operation点查询, row_count0, elapsed_ms0, throughput0, successFalse, error_messagestr(e), ) def run_benchmark_suite( configs: dict, databases: Optional[List[str]] None, ) - dict: 运行完整的基准测试套件 Args: configs: 数据库连接配置字典 databases: 指定测试的数据库列表None 表示全部 Returns: dict: 各数据库的测试报告映射 benchmark_registry { postgresql: PostgreSQLBenchmark, mongodb: MongoDBBenchmark, clickhouse: ClickHouseBenchmark, } if databases is None: databases list(benchmark_registry.keys()) reports {} for db_name in databases: if db_name not in benchmark_registry: print(f 跳过未知数据库: {db_name}) continue if db_name not in configs: print(f 跳过无配置的数据库: {db_name}) continue bench_cls benchmark_registry[db_name] try: bench bench_cls(configs[db_name]) report bench.run() reports[db_name] report print(report.summary()) except Exception as e: print(f {db_name} 基准测试异常: {e}) return reports if __name__ __main__: # 使用示例根据实际环境修改连接配置 configs { postgresql: { host: localhost, port: 5432, database: benchmark, user: postgres, password: , }, mongodb: { host: localhost, port: 27017, database: benchmark, }, clickhouse: { host: localhost, port: 9000, database: default, user: default, password: , }, } reports run_benchmark_suite(configs) # 输出对比摘要 print(\n * 60) print( 多数据库性能对比) print( * 60) for db, report in reports.items(): passed sum(1 for r in report.results if r.success) total len(report.results) found sum(1 for r in report.results if r.row_count 0) print(f {db}: {passed}/{total} 场景通过, {found} 场景返回数据) print( * 60)典型测试数据10万行事件记录AWS c5.2xlarge 同等配置场景PostgreSQLMongoDBClickHouse批量写入10万行~2.1s~1.3s~0.4s范围分组查询~120ms~95ms~35ms多维聚合 分位数~180ms~150ms~28ms单用户点查询~3ms~2ms~5msMongoDB 写入和省去 JOIN 的查询更快。ClickHouse 在聚合分析上断层领先。PG 在事务一致性和复杂 JOIN 上是唯一可行选择。测试数据不适用于所有场景但方向性结论可靠。四、创业团队 TCO三年总拥有成本性能不是创业团队的唯一变量。真正的决策约束是三年 TCO。以 5 人技术团队、日活 5 万、日增 200 万行事件数据为假设 创业团队数据库三年 TCO 计算模型 from dataclasses import dataclass from typing import Dict, List dataclass class TCOItem: 单项成本 name: str monthly_cost: float notes: str dataclass class TCOSummary: TCO 汇总 database: str monthly_items: Dict[str, float] monthly_total: float annual_total: float three_year_total: float def breakdown(self) - str: lines [ f\n{*50}, f {self.database} 三年 TCO 明细, f{*50}, ] for name, cost in sorted( self.monthly_items.items(), keylambda x: -x[1] ): pct (cost / self.monthly_total * 100) if self.monthly_total else 0 lines.append(f {name:20s}: ¥{cost:10,.0f}/月 ({pct:5.1f}%)) lines.append(f {─ * 40}) lines.append(f {月均总计:20s}: ¥{self.monthly_total:10,.0f}) lines.append(f {年度总计:20s}: ¥{self.annual_total:10,.0f}) lines.append(f {三年总计:20s}: ¥{self.three_year_total:10,.0f}) lines.append(f{*50}\n) return \n.join(lines) def calculate_tco( database: str, server_monthly: float, dba_hours_per_month: float, hourly_rate: float 150, ) - TCOSummary: 计算三年 TCO Args: database: 数据库名称 server_monthly: 服务器月费云实例或自建摊销 dba_hours_per_month: 每月运维人时 hourly_rate: 运维人员时薪含管理成本 personnel_cost dba_hours_per_month * hourly_rate monthly_total server_monthly personnel_cost return TCOSummary( databasedatabase, monthly_items{ 服务器/云实例: server_monthly, 运维人力: personnel_cost, }, monthly_totalmonthly_total, annual_totalmonthly_total * 12, three_year_totalmonthly_total * 36, ) # 场景一PostgreSQL 单机 流复制 pg_tco calculate_tco( databasePostgreSQL单机 流复制, server_monthly800, # 4C16G 云实例 dba_hours_per_month8, # 每周约 2 小时运维 ) # 场景二MongoDB 副本集 mongo_tco calculate_tco( databaseMongoDB3 节点副本集, server_monthly1200, # 3 台 4C8G dba_hours_per_month6, # 运维更轻量 ) # 场景三ClickHouse 单节点 ch_tco calculate_tco( databaseClickHouse单节点, server_monthly1000, # 8C32G存储 I/O 要求高 dba_hours_per_month10, # 分区管理、物化视图维护 ) # 场景四PG ClickHouse 混合架构 hybrid_tco_pg calculate_tco( databasePostgreSQL业务库, server_monthly800, dba_hours_per_month6, ) hybrid_tco_ch calculate_tco( databaseClickHouse分析库, server_monthly1000, dba_hours_per_month8, ) hybrid_combined TCOSummary( database混合架构PG ClickHouse, monthly_items{ **hybrid_tco_pg.monthly_items, **{fCH-{k}: v for k, v in hybrid_tco_ch.monthly_items.items()}, }, monthly_totalhybrid_tco_pg.monthly_total hybrid_tco_ch.monthly_total, annual_totalhybrid_tco_pg.annual_total hybrid_tco_ch.annual_total, three_year_totalhybrid_tco_pg.three_year_total hybrid_tco_ch.three_year_total, ) # 输出对比 for tco in [pg_tco, mongo_tco, ch_tco, hybrid_combined]: print(tco.breakdown()) # 横向对比 print(f{*50}) print(f 三年 TCO 横向对比) print(f{*50}) comparisons [ (PostgreSQL 单一库, pg_tco.three_year_total), (MongoDB 单一库, mongo_tco.three_year_total), (ClickHouse 单一库, ch_tco.three_year_total), (PG ClickHouse 混合, hybrid_combined.three_year_total), ] for name, total in sorted(comparisons, keylambda x: x[1]): print(f {name:25s}: ¥{total:12,.0f}) print(f{*50})计算结果单位人民币方案月均三年总计PostgreSQL 单机¥2,000¥72,000MongoDB 3 节点副本集¥2,100¥75,600ClickHouse 单节点¥2,500¥90,000PG ClickHouse 混合¥3,900¥140,400单一数据库方案三年总成本约 7-9 万混合架构约 14 万。差额主要来自多出来的服务器和运维人力。但混合架构在分析能力上的收益通常能覆盖这部分增量。如果团队没有分析需求多花这笔钱就是浪费。隐性成本也需要纳入考量。PG 招聘市场供给充足MongoDB 运维人才更难找ClickHouse 专家更是稀缺。这三者的学习曲线依次递增对应的人员流动风险和交接成本也依次递增。选型不只看账单还要看团队能不能长期持有。五、总结场景第一性能第二OLTP 业务选 PG文档型内容选 MongoDB分析型报表选 ClickHouse不要用通用性替代针对性。事务边界决定下限有跨表一致性需求PG 是唯一选项。MongoDB 4.0 虽支持多文档事务性能和隔离级别仍不及 PG。Schema 稳定性影响长期成本业务频繁迭代时MongoDB 的 Schema-less 省掉大量 DDL 迁移工作模式稳定后PG 的类型约束反过来降低数据质量风险。列式分析不应和 OLTP 混库聚合查询在 PG/Mongo 上可用但不可靠数据量跨过千万行后ClickHouse 的优势从 2-3 倍拉大到 10 倍以上。TCO 要用三年计算只看首月服务器成本容易低估运维人力和人员流动成本。混合架构初始投入高但业务分析和业务库解耦后独立扩缩容和故障隔离的价值在第二年就会体现。人员可得性是选型的硬约束选一个团队找不到人维护的数据库等于给自己挖坑。PG 生态成熟MongoDB 次之ClickHouse 需要专人投入。数据库选型没有银弹。决策的质量不取决于选的谁更先进而取决于团队对自身场景的认知深度和未来两年业务走向的判断精度。