Cách xây dựng Knowledge Base hiệu quả cho AI Models

AI chỉ mạnh khi nền tảng tri thức phía sau đủ tốt. Một knowledge base được xây dựng bài bản không chỉ giúp mô hình trả lời chính xác hơn mà còn cải thiện đáng kể tốc độ phản hồi — hai điểm yếu mà nhiều AI hiện nay vẫn gặp phải. Theo một nghiên cứu gần đây, nhiều chatbot AI lớn hiện vẫn trả lời sai gần một nửa số truy vấn người dùng đưa ra.

Chính vì vậy, việc xây dựng knowledge base không còn là một phần “phụ trợ”, mà gần như là yếu tố quyết định chất lượng của toàn bộ hệ thống AI.

1. Bắt đầu từ dữ liệu phù hợp, không phải dữ liệu thật nhiều

Một trong những sai lầm phổ biến nhất khi xây knowledge base là cho rằng càng nhiều dữ liệu thì AI càng thông minh. Thực tế, điều này rất dễ dẫn tới tình trạng “garbage in, garbage out” — dữ liệu kém chất lượng sẽ tạo ra kết quả kém chất lượng.

Điều quan trọng không phải số lượng, mà là độ liên quan của dữ liệu với mục tiêu hệ thống. Một knowledge base tốt thường chỉ tập trung vào những nội dung AI thực sự cần để trả lời đúng.

Ví dụ, nếu đang xây chatbot hỗ trợ khách hàng, hệ thống có thể chỉ cần tài liệu chính sách công ty, quy trình xử lý sự cố hoặc hướng dẫn sử dụng sản phẩm. Điều này giúp AI không “bịa” thêm thông tin ngoài phạm vi cho phép.

Lưu ý rằng hiện có xu hướng dùng dữ liệu do AI tạo ra để xây knowledge base cho AI khác. Cách này giúp tăng tốc rất nhanh, nhưng cũng tiềm ẩn rủi ro vì nội dung có thể chứa lỗi, thông tin thừa hoặc diễn đạt quá dài dòng. Vì vậy, mọi dữ liệu AI-generated đều nên được kiểm tra lại trước khi đưa vào hệ thống.

2. Làm sạch và chia nhỏ dữ liệu là bước cực kỳ quan trọng

Sau khi thu thập dữ liệu, bước tiếp theo là làm sạch nội dung. Quá trình này thường bao gồm việc xóa dữ liệu trùng lặp, loại bỏ thông tin lỗi thời, đồng thời chuẩn hóa thuật ngữ và định dạng để toàn bộ knowledge base có tính nhất quán.

Sau đó, dữ liệu sẽ được chia thành các “chunk” nhỏ. Mỗi chunk chỉ nên chứa một ý hoặc một chủ đề rõ ràng để AI dễ tìm kiếm và truy xuất hơn.

Bạn nên chia chunk theo kiểu câu hỏi người dùng thực tế thay vì chia theo cấu trúc tài liệu truyền thống. Ví dụ, thay vì chia theo “Chương quản lý tài khoản”, có thể tách thành các nội dung như:
“Làm sao đổi mật khẩu?” hoặc “Chính sách mật khẩu là gì?”.

Cách tiếp cận này giúp AI phản hồi gần với nhu cầu thật của người dùng hơn rất nhiều.

3. Metadata và vector hóa giúp AI “hiểu” dữ liệu nhanh hơn

Sau khi dữ liệu được chia nhỏ, mỗi chunk thường sẽ được gắn thêm metadata như nguồn dữ liệu, chủ đề, ngày cập nhật hoặc quyền truy cập. Metadata giúp hệ thống lọc và tìm đúng nội dung nhanh hơn thay vì phải quét toàn bộ knowledge base.

Tiếp theo, văn bản sẽ được chuyển thành vector thông qua embedding model như OpenAI v3-Large hoặc BGE-M3. Đây là bước rất quan trọng vì AI xử lý vector nhanh hơn nhiều so với văn bản thô.

Một chunk hoàn chỉnh thường sẽ bao gồm:

  • Vector embedding
  • Nội dung gốc
  • Metadata đi kèm

Đây cũng là nền tảng của hầu hết hệ thống RAG hiện nay.

4. Chọn đúng vector database và tối ưu retrieval

Sau khi vector hóa, dữ liệu thường được lưu trong các vector database như Pinecone, Milvus hoặc Weaviate. Những hệ thống này được thiết kế riêng để truy xuất vector theo ngữ nghĩa. Bạn có thể tải lên dữ liệu vector bằng cách viết một đoạn mã Python đơn giản.

 import math
 import time
 import json
 from dataclasses import dataclass, field
 from typing import Any

 import numpy as np

 # Vector Normalization + Metadata

 def normalize_l2(vector: list[float]) -> list[float]:
 """
 Return an L2-normalized copy of `vec`.
 Many vector stores use dot-product similarity. If you normalize vectors to
 unit length, dot-product becomes equivalent to cosine similarity.
 """
 arr = np.array(vector, dtype=np.float32)
 norm = np.linalg.norm(arr)
 if norm == 0:
 return vector
 return (arr / norm).tolist()

 def prepare_record(
 doc_id: str,
 embedding: list[float],
 text: str,
 source: str,
 extra_metadata: dict[str, Any] | None = None,
 ) -> dict:
 """
 Prepare a single record for vector DB upsert.
 Metadata serves two purposes:
 - Filtering: narrow down search to a subset
 """
 metadata = {
 "source": source,
 "text_preview": text[:500],
 "char_count": len(text),
 }
 if extra_metadata:
 metadata.update(extra_metadata)

 return {
 "id": doc_id,
 "values": normalize_l2(embedding),
 "metadata": metadata,
 }

# Vector Quantization

 # Scalar Quantization / SQ

 def scalar_quantization(input_vec) -> dict:
 """
 This funtion demonstrates
 how to compress float32 input_vec to uint8
 """
 input_arr = np.array(input_vec, dtype=np.float32)
 min, max = input_arr.min(), input_arr.max()
 range = (max - min)
 if range == 0:
 quantized = np.zeros_like(arr, dtype=np.uint8)
 else:
 quantized = ((input_arr - min) / range * 255).astype(np.uint8)

 return {
 "quantized": quantized.tolist(),
 "min": float(min),
 "max": float(max),
 }

 def scalar_dequantization(record: dict) -> list[float]:
 """
 You can Reconstruct the original vector
 by approximate float32 vector from uint8.
 """
 arr = np.array(record["quantized"], dtype=np.float32)
 return (arr / 255 * (record["max"] - record["min"]) + record["min"]).tolist()

 # Product Quantization / PQ

 def train_product_quantizer( vectors, num_subvectors: int = 8, num_centroids: int = 256, max_iterations: int = 20) -> list:
 """
 This function demonstrates
 split vector into subvectors, cluster each independently
 """
 from sklearn.cluster import KMeans

 dim = vectors.shape[1]
 assert dim % num_subvectors == 0, "dim must be divisible by num_subvectors"
 sub_dim = dim // num_subvectors

 codebooks = []
 for i in range(num_subvectors):
 sub_vectors = vectors[:, i * sub_dim : (i + 1) * sub_dim]
 kmeans = KMeans(n_clusters=num_centroids, max_iter=max_iterations, n_init=1)
 kmeans.fit(sub_vectors)
 codebooks.append(kmeans.cluster_centers_)

 return codebooks

 def pq_encode(vector: np.ndarray, codebooks: list[np.ndarray]) -> list[int]:
 """
 Encode a single vector into PQ codes (one uint8 per subvector)
 """
 num_subvectors = len(codebooks)
 sub_dim = len(vector) // num_subvectors
 codes = []

 for i, codebook in enumerate(codebooks):
 sub_vec = vector[i * sub_dim : (i + 1) * sub_dim]
 distances = np.linalg.norm(codebook - sub_vec, axis=1)
 codes.append(int(np.argmin(distances)))

 return codes

 def pq_decode(codes: list[int], codebooks: list[np.ndarray]) -> np.ndarray:
 """
 Reconstruct approximate vector from PQ codes
 """
 return np.concatenate(
 [codebook[code] for code, codebook in zip(codes, codebooks)]
 )

Nhiều developer thường chỉ tập trung “làm cho chạy được” mà quên tối ưu retrieval. Trong khi thực tế, người dùng không chỉ muốn AI trả lời đúng mà còn muốn phản hồi gần như ngay lập tức.

Để truy xuất dữ liệu từ cơ sở dữ liệu vector, bạn có thể sử dụng các framework điều phối như LlamaIndex và LangChain.

LlamaIndex có thể duyệt qua cơ sở dữ liệu vector nhanh hơn và tìm đến chính xác đoạn dữ liệu chứa nội dung liên quan đến truy vấn của người dùng.

Sau đó, LangChain sẽ lấy dữ liệu từ đoạn đó và chuyển đổi nó theo truy vấn của người dùng. Ví dụ: tóm tắt văn bản hoặc viết email từ dữ liệu đó.

"""
 Hybrid Retrieval: Take benefits from both keyword search and vector similarity

 Where each approach shines:
 - Keywords: looks for exact matches, but will miss searches with synonym
 - Embeddings: has advantage of capturing the meaning, but there is possibility of missing exact keyword
 Hybrid is a combination of both to get the best of each.
 """

 import math
 from collections import defaultdict
 from dataclasses import dataclass
 import numpy as np

 @dataclass
 class Document:
 id: str
 text: str
 embedding: list[float]

 class BestMatching25Index:
 def __init__(self, k1: float = 1.5, b: float = 0.75):
 # Here k1 is the term frequency saturation limit
 # and b is length of normalization
 self.k1 = k1
 self.b = b
 self.doc_lengths: dict[str, int] = {}
 self.avg_doc_length: float = 0
 self.doc_freqs: dict[str, int] = {}
 self.term_freqs: dict[str, dict[str, int]] = {}
 self.corpus_size: int = 0

 def _tokenize(self, text: str) -> list[str]:
 return text.lower().split()

 def index(self, documents: list[Document]) -> None:
 self.corpus_size = len(documents)

 for doc in documents:
 tokens = self._tokenize(doc.text)
 self.doc_lengths[doc.id] = len(tokens)
 self.term_freqs[doc.id] = {}

 seen_terms: set[str] = set()
 for token in tokens:
 self.term_freqs[doc.id][token] = self.term_freqs[doc.id].get(token, 0) + 1
 if token not in seen_terms:
 self.doc_freqs[token] = self.doc_freqs.get(token, 0) + 1
 seen_terms.add(token)

 self.avg_doc_length = sum(self.doc_lengths.values()) / self.corpus_size

 def score(self, query: str, doc_id: str) -> float:
 query_terms = self._tokenize(query)
 doc_len = self.doc_lengths[doc_id]
 score = 0.0

 for term in query_terms:
 if term not in self.doc_freqs or term not in self.term_freqs.get(doc_id, {}):
 continue

 tf = self.term_freqs[doc_id][term]
 df = self.doc_freqs[term]
 idf = math.log((self.corpus_size - df + 0.5) / (df + 0.5) + 1)
 tf_norm = (tf * (self.k1 + 1)) / (
 tf + self.k1 * (1 - self.b + self.b * doc_len / self.avg_doc_length)
 )
 score += idf * tf_norm

 return score

 def search(self, query: str, top_k: int = 10) -> list[tuple[str, float]]:
 scores = [
 (doc_id, self.score(query, doc_id))
 for doc_id in self.doc_lengths
 ]
 scores.sort(key=lambda x: x[1], reverse=True)
 return scores[:top_k]

 class VectorIndex:
 """This class implements the smart search using the hybrid search.
 The index function normalize and stores the document
 search implements a cosine similarity search
 hybrid_search_weighted merges BM25 index and vector index using weighted average
 Reciprocal_rank_fusion Combines the results in an efficient way
 """

 def __init__(self):
 self.documents: dict[str, np.ndarray] = {}

 def index(self, documents: list[Document]) -> None:
 for doc in documents:
 arr = np.array(doc.embedding, dtype=np.float32)
 norm = np.linalg.norm(arr)
 self.documents[doc.id] = arr / norm if norm > 0 else arr

 def search(self, query_embedding: list[float], top_k: int = 10) -> list[tuple[str, float]]:
 q = np.array(query_embedding, dtype=np.float32)
 q = q / np.linalg.norm(q)

 scores = [
 (doc_id, float(np.dot(q, emb)))
 for doc_id, emb in self.documents.items()
 ]
 scores.sort(key=lambda x: x[1], reverse=True)
 return scores[:top_k]

 def hybrid_search_weighted(
 query: str,
 query_embedding: list[float],
 bm25_index: BestMatching25Index,
 vector_index: VectorIndex,
 alpha: float = 0.5,
 top_k: int = 10,
 ) -> list[dict]:
 """Combine keyword and vector scores with a tunable weight.

 alpha = 1.0 → pure vector search
 alpha = 0.0 → pure keyword search
 alpha = 0.5 → equal weight (good starting point)
 """
 keyword_results = bm25_index.search(query, top_k=top_k * 2)
 vector_results = vector_index.search(query_embedding, top_k=top_k * 2)

 # Normalize (min-max) each score list to [0, 1]
 def normalize_scores(results: list[tuple[str, float]]) -> dict[str, float]:
 if not results:
 return {}
 scores = [s for _, s in results]
 min_s, max_s = min(scores), max(scores)
 rng = max_s - min_s
 if rng == 0:
 return {doc_id: 1.0 for doc_id, _ in results}
 return {doc_id: (s - min_s) / rng for doc_id, s in results}

 keyword_scores = normalize_scores(keyword_results)
 vector_scores = normalize_scores(vector_results)

 # Merge
 all_doc_ids = set(keyword_scores) | set(vector_scores)
 combined = []
 for doc_id in all_doc_ids:
 ks = keyword_scores.get(doc_id, 0.0)
 vs = vector_scores.get(doc_id, 0.0)
 combined.append({
 "id": doc_id,
 "score": alpha * vs + (1 - alpha) * ks,
 "keyword_score": ks,
 "vector_score": vs,
 })

 combined.sort(key=lambda x: x["score"], reverse=True)
 return combined[:top_k]

 def reciprocal_rank_fusion(
 *ranked_lists: list[tuple[str, float]],
 k: int = 60,
 top_n: int = 10,
 ) -> list[dict]:
 """
 Merge multiple ranked lists, uses RRF (Reciprocal Rank Fusion)

 RRF score = sum over all lists of: 1 / (k + rank)

 Why RRF over weighted combination?
 - No score normalization needed (works on ranks, not raw scores)
 - No alpha tuning needed
 - Robust across different score distributions
 - Used by Elasticsearch, Pinecone, Weaviate under the hood
 """
 rrf_scores: dict[str, float] = defaultdict(float)
 doc_details: dict[str, dict] = {}

 for list_idx, ranked_list in enumerate(ranked_lists):
 for rank, (doc_id, raw_score) in enumerate(ranked_list, start=1):
 rrf_scores[doc_id] += 1.0 / (k + rank)
 if doc_id not in doc_details:
 doc_details[doc_id] = {}
 doc_details[doc_id][f"list_{list_idx}_rank"] = rank
 doc_details[doc_id][f"list_{list_idx}_score"] = raw_score

 results = []
 for doc_id, rrf_score in rrf_scores.items():
 results.append({
 "id": doc_id,
 "rrf_score": round(rrf_score, 6),
 **doc_details[doc_id],
 })

 results.sort(key=lambda x: x["rrf_score"], reverse=True)
 return results[:top_n]

 def hybrid_search_rrf(
 query: str,
 query_embedding: list[float],
 bm25_index: BestMatching25Index,
 vector_index: VectorIndex,
 top_k: int = 10,
 ) -> list[dict]:
 keyword_results = bm25_index.search(query, top_k=top_k * 2)
 vector_results = vector_index.search(query_embedding, top_k=top_k * 2)

 return reciprocal_rank_fusion(keyword_results, vector_results, top_n=top_k)

Một trong những cách retrieval hiệu quả nhất hiện nay là hybrid retrieval — kết hợp giữa keyword search và semantic vector search. Keyword search mạnh ở các truy vấn chính xác như “password policy”, trong khi embedding search lại giỏi hơn ở việc hiểu ý nghĩa và ngữ cảnh câu hỏi.

Khi kết hợp cả hai, hệ thống sẽ vừa chính xác vừa linh hoạt hơn rất nhiều. Các framework như LlamaIndex và LangChain hiện là lựa chọn phổ biến để xây dựng pipeline retrieval theo hướng này.

5. Knowledge base phải được cập nhật liên tục

Một knowledge base tốt không phải thứ “xây xong rồi để đó”.

Theo thời gian, dữ liệu có thể lỗi thời, chính sách thay đổi hoặc embedding model được cập nhật. Nếu không refresh định kỳ, AI sẽ bắt đầu đưa ra các phản hồi không còn chính xác.

Có một khái niệm gọi là selective forgetting — tức chủ động xóa hoặc cập nhật những dữ liệu không còn phù hợp. Các công cụ như DeepEval hoặc TruLens có thể giúp theo dõi chất lượng retrieval và xác định chunk nào đang gây ra câu trả lời sai.

 """
 Knowledge Base Quality Monitoring

 Knowledge base health with the help of automated checks:
 1. Retrieval quality — is it finding the right documents?
 2. Freshness detection — Are documents stale or embeddings drifting?
 3. Unified pipeline — Scheduled monitoring with alerts
 """

 import time
 import json
 import logging
 from datetime import datetime, timedelta
 from dataclasses import dataclass, field
 from typing import Any, Callable

 import numpy as np

 logging.basicConfig(level=logging.INFO)
 logger = logging.getLogger("kb_monitor")

 def setup_deepeval_metrics():
 """Define retrieval quality metrics using DeepEval.

 DeepEval provides LLM-evaluated metrics — it uses a judge LLM to score
 whether retrieved context actually helps answer the question.
 """
 from deepeval.metrics import (
 AnswerRelevancyMetric,
 FaithfulnessMetric,
 ContextualPrecisionMetric,
 ContextualRecallMetric,
 )
 from deepeval.test_case import LLMTestCase

 metrics = {
 # Does the answer address the question?
 "relevancy": AnswerRelevancyMetric(threshold=0.7),
 # Is the answer grounded in the retrieved context (no hallucination)?
 "faithfulness": FaithfulnessMetric(threshold=0.7),
 # Are the top-ranked retrieved docs actually relevant?
 "context_precision": ContextualPrecisionMetric(threshold=0.7),
 # Did we retrieve all the docs needed to answer?
 "context_recall": ContextualRecallMetric(threshold=0.7),
 }

 return metrics, LLMTestCase

 def evaluate_retrieval_quality(
 rag_pipeline: Callable,
 test_cases: list[dict],
 ) -> list[dict]:
 """Run a set of test queries through your RAG pipeline and score them.

 Each test case should have:
 - query: the user question
 - expected_answer: ground truth answer (for recall/relevancy)
 """
 from deepeval import evaluate
 from deepeval.test_case import LLMTestCase
 from deepeval.metrics import (
 AnswerRelevancyMetric,
 FaithfulnessMetric,
 ContextualPrecisionMetric,
 ContextualRecallMetric,
 )

 results = []

 for tc in test_cases:
 # Run your actual RAG pipeline
 response = rag_pipeline(tc["query"])

 test_case = LLMTestCase(
 input=tc["query"],
 actual_output=response["answer"],
 expected_output=tc["expected_answer"],
 retrieval_context=response["retrieved_contexts"],
 )

 metrics = [
 AnswerRelevancyMetric(threshold=0.7),
 FaithfulnessMetric(threshold=0.7),
 ContextualPrecisionMetric(threshold=0.7),
 ContextualRecallMetric(threshold=0.7),
 ]

 for metric in metrics:
 metric.measure(test_case)

 results.append({
 "query": tc["query"],
 "scores": {m.__class__.__name__: m.score for m in metrics},
 "passed": all(m.is_successful() for m in metrics),
 })

 return results

 def setup_trulens_monitoring(rag_pipeline: Callable, app_name: str = "my_kb"):
 """Wrap your RAG pipeline with TruLens for continuous feedback logging.

 TruLens records every query + response + retrieved context, then
 runs feedback functions asynchronously to score each interaction.
 """
 from trulens.core import TruSession, Feedback, Select
 from trulens.providers.openai import OpenAI as TruLensOpenAI
 from trulens.apps.custom import TruCustomApp, instrument

 session = TruSession()

 # Feedback provider (uses an LLM to judge quality)
 provider = TruLensOpenAI()

 feedbacks = [
 # Is the response relevant to the query?
 Feedback(provider.relevance)
 .on_input()
 .on_output(),

 # Is the response grounded in retrieved context?
 Feedback(provider.groundedness_measure_with_cot_reasons)
 .on(Select.RecordCalls.retrieve.rets)
 .on_output(),

 # Is the retrieved context relevant to the query?
 Feedback(provider.context_relevance)
 .on_input()
 .on(Select.RecordCalls.retrieve.rets),
 ]

 # Wrap your pipeline — every call is now logged and scored
 @instrument
 class InstrumentedRAG:
 def __init__(self, pipeline):
 self._pipeline = pipeline

 @instrument
 def retrieve(self, query: str) -> list[str]:
 result = self._pipeline(query)
 return result["retrieved_contexts"]

 @instrument
 def query(self, query: str) -> str:
 result = self._pipeline(query)
 return result["answer"]

 instrumented = InstrumentedRAG(rag_pipeline)

 tru_app = TruCustomApp(
 instrumented,
 app_name=app_name,
 feedbacks=feedbacks,
 )

 return tru_app, session

 def get_trulens_dashboard_url(session) -> str:
 """Launch the TruLens dashboard to visualize quality over time."""
 session.run_dashboard(port=8501)
 return "http://localhost:8501"

 @dataclass
 class DocumentFreshness:
 doc_id: str
 last_updated: datetime
 last_embedded: datetime
 source_hash: str # hash of source content at embedding time

 class FreshnessMonitor:
 """Detect stale documents and embedding drift."""

 def __init__(self, staleness_threshold_days: int = 30):
 self.threshold = timedelta(days=staleness_threshold_days)
 self.freshness_records: dict[str, DocumentFreshness] = {}

 def register(self, doc_id: str, source_hash: str) -> None:
 now = datetime.utcnow()
 self.freshness_records[doc_id] = DocumentFreshness(
 doc_id=doc_id,
 last_updated=now,
 last_embedded=now,
 source_hash=source_hash,
 )

 def check_staleness(self) -> dict:
 """Find documents that haven't been re-embedded recently."""
 now = datetime.utcnow()
 stale, fresh = [], []

 for doc_id, record in self.freshness_records.items():
 age = now - record.last_embedded
 if age > self.threshold:
 stale.append({"id": doc_id, "days_stale": age.days})
 else:
 fresh.append(doc_id)

 return {
 "total": len(self.freshness_records),
 "fresh": len(fresh),
 "stale": len(stale),
 "stale_documents": stale,
 }

 def check_content_drift(
 self, doc_id: str, current_source_hash: str
 ) -> bool:
 """Check if source content changed since last embedding."""
 record = self.freshness_records.get(doc_id)
 if not record:
 return True # unknown doc, treat as drifted
 return record.source_hash != current_source_hash

 def detect_embedding_drift(
 old_embeddings: dict[str, list[float]],
 new_embeddings: dict[str, list[float]],
 drift_threshold: float = 0.1,
 ) -> dict:
 """Compare old vs new embeddings for the same documents.

 If your embedding model gets updated (or you switch models),
 existing vectors may no longer be compatible. This detects that.
 """
 drifted = []
 common_ids = set(old_embeddings) & set(new_embeddings)

 for doc_id in common_ids:
 old = np.array(old_embeddings[doc_id])
 new = np.array(new_embeddings[doc_id])

 # cosine distance: 0 = identical, 2 = opposite
 cos_sim = np.dot(old, new) / (np.linalg.norm(old) * np.linalg.norm(new))
 cos_dist = 1 - cos_sim

 if cos_dist > drift_threshold:
 drifted.append({
 "id": doc_id,
 "cosine_distance": round(float(cos_dist), 4),
 })

 return {
 "documents_compared": len(common_ids),
 "drifted": len(drifted),
 "drift_threshold": drift_threshold,
 "drifted_documents": sorted(drifted, key=lambda x: x["cosine_distance"], reverse=True),
 }

6. Ba vấn đề lớn nhất khi xây knowledge base

Vấn đề phổ biến nhất là dữ liệu chất lượng kém. Đây cũng là nguyên nhân khiến AI hallucinate. Ví dụ nổi tiếng là chatbot của Air Canada từng tự “bịa” ra chính sách hoàn tiền không tồn tại.

Một vấn đề khác là retrieval chậm. Nhiều hệ thống AI trả lời đúng nhưng quá lag vì developer chưa tối ưu index hoặc vector storage. Tác giả khuyến nghị nên dùng HNSW hoặc IVF index thay vì flat index để tăng tốc truy xuất.

Ngoài ra, scalability cũng là bài toán lớn. Nhiều đội ngũ ban đầu chọn monolithic architecture để triển khai nhanh, nhưng khi lượng truy vấn tăng mạnh thì CPU và RAM bị quá tải. Theo tác giả, horizontal sharding là hướng phù hợp hơn để scale knowledge base trong dài hạn.

7. Knowledge base không phải nơi “dump dữ liệu”

Cuối cùng, cần lưu ý rằng knowledge base không phải nơi ném toàn bộ dữ liệu vào rồi hy vọng AI tự hiểu mọi thứ. Nó là một tài sản cần được curate và tối ưu liên tục.

Bạn nên bắt đầu từ những tác vụ nhỏ, chẳng hạn chỉ tập trung vào 10 câu hỏi phổ biến nhất trước. Sau khi AI trả lời ổn định và chính xác, mới tiếp tục mở rộng hệ thống. Khác biệt giữa một AI “đoán mò” và một AI “thực sự biết” nằm ở chính quá trình curate dữ liệu có chủ đích này.