AI chỉ mạnh khi nền tảng tri thức phía sau đủ tốt. Một knowledge base được xây dựng bài bản không chỉ giúp mô hình trả lời chính xác hơn mà còn cải thiện đáng kể tốc độ phản hồi — hai điểm yếu mà nhiều AI hiện nay vẫn gặp phải. Theo một nghiên cứu gần đây, nhiều chatbot AI lớn hiện vẫn trả lời sai gần một nửa số truy vấn người dùng đưa ra.
Chính vì vậy, việc xây dựng knowledge base không còn là một phần “phụ trợ”, mà gần như là yếu tố quyết định chất lượng của toàn bộ hệ thống AI.
1. Bắt đầu từ dữ liệu phù hợp, không phải dữ liệu thật nhiều
Một trong những sai lầm phổ biến nhất khi xây knowledge base là cho rằng càng nhiều dữ liệu thì AI càng thông minh. Thực tế, điều này rất dễ dẫn tới tình trạng “garbage in, garbage out” — dữ liệu kém chất lượng sẽ tạo ra kết quả kém chất lượng.
Điều quan trọng không phải số lượng, mà là độ liên quan của dữ liệu với mục tiêu hệ thống. Một knowledge base tốt thường chỉ tập trung vào những nội dung AI thực sự cần để trả lời đúng.
Ví dụ, nếu đang xây chatbot hỗ trợ khách hàng, hệ thống có thể chỉ cần tài liệu chính sách công ty, quy trình xử lý sự cố hoặc hướng dẫn sử dụng sản phẩm. Điều này giúp AI không “bịa” thêm thông tin ngoài phạm vi cho phép.
Lưu ý rằng hiện có xu hướng dùng dữ liệu do AI tạo ra để xây knowledge base cho AI khác. Cách này giúp tăng tốc rất nhanh, nhưng cũng tiềm ẩn rủi ro vì nội dung có thể chứa lỗi, thông tin thừa hoặc diễn đạt quá dài dòng. Vì vậy, mọi dữ liệu AI-generated đều nên được kiểm tra lại trước khi đưa vào hệ thống.
2. Làm sạch và chia nhỏ dữ liệu là bước cực kỳ quan trọng
Sau khi thu thập dữ liệu, bước tiếp theo là làm sạch nội dung. Quá trình này thường bao gồm việc xóa dữ liệu trùng lặp, loại bỏ thông tin lỗi thời, đồng thời chuẩn hóa thuật ngữ và định dạng để toàn bộ knowledge base có tính nhất quán.
Sau đó, dữ liệu sẽ được chia thành các “chunk” nhỏ. Mỗi chunk chỉ nên chứa một ý hoặc một chủ đề rõ ràng để AI dễ tìm kiếm và truy xuất hơn.
Bạn nên chia chunk theo kiểu câu hỏi người dùng thực tế thay vì chia theo cấu trúc tài liệu truyền thống. Ví dụ, thay vì chia theo “Chương quản lý tài khoản”, có thể tách thành các nội dung như:
“Làm sao đổi mật khẩu?” hoặc “Chính sách mật khẩu là gì?”.
Cách tiếp cận này giúp AI phản hồi gần với nhu cầu thật của người dùng hơn rất nhiều.
3. Metadata và vector hóa giúp AI “hiểu” dữ liệu nhanh hơn
Sau khi dữ liệu được chia nhỏ, mỗi chunk thường sẽ được gắn thêm metadata như nguồn dữ liệu, chủ đề, ngày cập nhật hoặc quyền truy cập. Metadata giúp hệ thống lọc và tìm đúng nội dung nhanh hơn thay vì phải quét toàn bộ knowledge base.
Tiếp theo, văn bản sẽ được chuyển thành vector thông qua embedding model như OpenAI v3-Large hoặc BGE-M3. Đây là bước rất quan trọng vì AI xử lý vector nhanh hơn nhiều so với văn bản thô.
Một chunk hoàn chỉnh thường sẽ bao gồm:
- Vector embedding
- Nội dung gốc
- Metadata đi kèm
Đây cũng là nền tảng của hầu hết hệ thống RAG hiện nay.
4. Chọn đúng vector database và tối ưu retrieval
Sau khi vector hóa, dữ liệu thường được lưu trong các vector database như Pinecone, Milvus hoặc Weaviate. Những hệ thống này được thiết kế riêng để truy xuất vector theo ngữ nghĩa. Bạn có thể tải lên dữ liệu vector bằng cách viết một đoạn mã Python đơn giản.
import math
import time
import json
from dataclasses import dataclass, field
from typing import Any
import numpy as np
# Vector Normalization + Metadata
def normalize_l2(vector: list[float]) -> list[float]:
"""
Return an L2-normalized copy of `vec`.
Many vector stores use dot-product similarity. If you normalize vectors to
unit length, dot-product becomes equivalent to cosine similarity.
"""
arr = np.array(vector, dtype=np.float32)
norm = np.linalg.norm(arr)
if norm == 0:
return vector
return (arr / norm).tolist()
def prepare_record(
doc_id: str,
embedding: list[float],
text: str,
source: str,
extra_metadata: dict[str, Any] | None = None,
) -> dict:
"""
Prepare a single record for vector DB upsert.
Metadata serves two purposes:
- Filtering: narrow down search to a subset
"""
metadata = {
"source": source,
"text_preview": text[:500],
"char_count": len(text),
}
if extra_metadata:
metadata.update(extra_metadata)
return {
"id": doc_id,
"values": normalize_l2(embedding),
"metadata": metadata,
}
# Vector Quantization
# Scalar Quantization / SQ
def scalar_quantization(input_vec) -> dict:
"""
This funtion demonstrates
how to compress float32 input_vec to uint8
"""
input_arr = np.array(input_vec, dtype=np.float32)
min, max = input_arr.min(), input_arr.max()
range = (max - min)
if range == 0:
quantized = np.zeros_like(arr, dtype=np.uint8)
else:
quantized = ((input_arr - min) / range * 255).astype(np.uint8)
return {
"quantized": quantized.tolist(),
"min": float(min),
"max": float(max),
}
def scalar_dequantization(record: dict) -> list[float]:
"""
You can Reconstruct the original vector
by approximate float32 vector from uint8.
"""
arr = np.array(record["quantized"], dtype=np.float32)
return (arr / 255 * (record["max"] - record["min"]) + record["min"]).tolist()
# Product Quantization / PQ
def train_product_quantizer( vectors, num_subvectors: int = 8, num_centroids: int = 256, max_iterations: int = 20) -> list:
"""
This function demonstrates
split vector into subvectors, cluster each independently
"""
from sklearn.cluster import KMeans
dim = vectors.shape[1]
assert dim % num_subvectors == 0, "dim must be divisible by num_subvectors"
sub_dim = dim // num_subvectors
codebooks = []
for i in range(num_subvectors):
sub_vectors = vectors[:, i * sub_dim : (i + 1) * sub_dim]
kmeans = KMeans(n_clusters=num_centroids, max_iter=max_iterations, n_init=1)
kmeans.fit(sub_vectors)
codebooks.append(kmeans.cluster_centers_)
return codebooks
def pq_encode(vector: np.ndarray, codebooks: list[np.ndarray]) -> list[int]:
"""
Encode a single vector into PQ codes (one uint8 per subvector)
"""
num_subvectors = len(codebooks)
sub_dim = len(vector) // num_subvectors
codes = []
for i, codebook in enumerate(codebooks):
sub_vec = vector[i * sub_dim : (i + 1) * sub_dim]
distances = np.linalg.norm(codebook - sub_vec, axis=1)
codes.append(int(np.argmin(distances)))
return codes
def pq_decode(codes: list[int], codebooks: list[np.ndarray]) -> np.ndarray:
"""
Reconstruct approximate vector from PQ codes
"""
return np.concatenate(
[codebook[code] for code, codebook in zip(codes, codebooks)]
)Nhiều developer thường chỉ tập trung “làm cho chạy được” mà quên tối ưu retrieval. Trong khi thực tế, người dùng không chỉ muốn AI trả lời đúng mà còn muốn phản hồi gần như ngay lập tức.
Để truy xuất dữ liệu từ cơ sở dữ liệu vector, bạn có thể sử dụng các framework điều phối như LlamaIndex và LangChain.
LlamaIndex có thể duyệt qua cơ sở dữ liệu vector nhanh hơn và tìm đến chính xác đoạn dữ liệu chứa nội dung liên quan đến truy vấn của người dùng.
Sau đó, LangChain sẽ lấy dữ liệu từ đoạn đó và chuyển đổi nó theo truy vấn của người dùng. Ví dụ: tóm tắt văn bản hoặc viết email từ dữ liệu đó.
"""
Hybrid Retrieval: Take benefits from both keyword search and vector similarity
Where each approach shines:
- Keywords: looks for exact matches, but will miss searches with synonym
- Embeddings: has advantage of capturing the meaning, but there is possibility of missing exact keyword
Hybrid is a combination of both to get the best of each.
"""
import math
from collections import defaultdict
from dataclasses import dataclass
import numpy as np
@dataclass
class Document:
id: str
text: str
embedding: list[float]
class BestMatching25Index:
def __init__(self, k1: float = 1.5, b: float = 0.75):
# Here k1 is the term frequency saturation limit
# and b is length of normalization
self.k1 = k1
self.b = b
self.doc_lengths: dict[str, int] = {}
self.avg_doc_length: float = 0
self.doc_freqs: dict[str, int] = {}
self.term_freqs: dict[str, dict[str, int]] = {}
self.corpus_size: int = 0
def _tokenize(self, text: str) -> list[str]:
return text.lower().split()
def index(self, documents: list[Document]) -> None:
self.corpus_size = len(documents)
for doc in documents:
tokens = self._tokenize(doc.text)
self.doc_lengths[doc.id] = len(tokens)
self.term_freqs[doc.id] = {}
seen_terms: set[str] = set()
for token in tokens:
self.term_freqs[doc.id][token] = self.term_freqs[doc.id].get(token, 0) + 1
if token not in seen_terms:
self.doc_freqs[token] = self.doc_freqs.get(token, 0) + 1
seen_terms.add(token)
self.avg_doc_length = sum(self.doc_lengths.values()) / self.corpus_size
def score(self, query: str, doc_id: str) -> float:
query_terms = self._tokenize(query)
doc_len = self.doc_lengths[doc_id]
score = 0.0
for term in query_terms:
if term not in self.doc_freqs or term not in self.term_freqs.get(doc_id, {}):
continue
tf = self.term_freqs[doc_id][term]
df = self.doc_freqs[term]
idf = math.log((self.corpus_size - df + 0.5) / (df + 0.5) + 1)
tf_norm = (tf * (self.k1 + 1)) / (
tf + self.k1 * (1 - self.b + self.b * doc_len / self.avg_doc_length)
)
score += idf * tf_norm
return score
def search(self, query: str, top_k: int = 10) -> list[tuple[str, float]]:
scores = [
(doc_id, self.score(query, doc_id))
for doc_id in self.doc_lengths
]
scores.sort(key=lambda x: x[1], reverse=True)
return scores[:top_k]
class VectorIndex:
"""This class implements the smart search using the hybrid search.
The index function normalize and stores the document
search implements a cosine similarity search
hybrid_search_weighted merges BM25 index and vector index using weighted average
Reciprocal_rank_fusion Combines the results in an efficient way
"""
def __init__(self):
self.documents: dict[str, np.ndarray] = {}
def index(self, documents: list[Document]) -> None:
for doc in documents:
arr = np.array(doc.embedding, dtype=np.float32)
norm = np.linalg.norm(arr)
self.documents[doc.id] = arr / norm if norm > 0 else arr
def search(self, query_embedding: list[float], top_k: int = 10) -> list[tuple[str, float]]:
q = np.array(query_embedding, dtype=np.float32)
q = q / np.linalg.norm(q)
scores = [
(doc_id, float(np.dot(q, emb)))
for doc_id, emb in self.documents.items()
]
scores.sort(key=lambda x: x[1], reverse=True)
return scores[:top_k]
def hybrid_search_weighted(
query: str,
query_embedding: list[float],
bm25_index: BestMatching25Index,
vector_index: VectorIndex,
alpha: float = 0.5,
top_k: int = 10,
) -> list[dict]:
"""Combine keyword and vector scores with a tunable weight.
alpha = 1.0 → pure vector search
alpha = 0.0 → pure keyword search
alpha = 0.5 → equal weight (good starting point)
"""
keyword_results = bm25_index.search(query, top_k=top_k * 2)
vector_results = vector_index.search(query_embedding, top_k=top_k * 2)
# Normalize (min-max) each score list to [0, 1]
def normalize_scores(results: list[tuple[str, float]]) -> dict[str, float]:
if not results:
return {}
scores = [s for _, s in results]
min_s, max_s = min(scores), max(scores)
rng = max_s - min_s
if rng == 0:
return {doc_id: 1.0 for doc_id, _ in results}
return {doc_id: (s - min_s) / rng for doc_id, s in results}
keyword_scores = normalize_scores(keyword_results)
vector_scores = normalize_scores(vector_results)
# Merge
all_doc_ids = set(keyword_scores) | set(vector_scores)
combined = []
for doc_id in all_doc_ids:
ks = keyword_scores.get(doc_id, 0.0)
vs = vector_scores.get(doc_id, 0.0)
combined.append({
"id": doc_id,
"score": alpha * vs + (1 - alpha) * ks,
"keyword_score": ks,
"vector_score": vs,
})
combined.sort(key=lambda x: x["score"], reverse=True)
return combined[:top_k]
def reciprocal_rank_fusion(
*ranked_lists: list[tuple[str, float]],
k: int = 60,
top_n: int = 10,
) -> list[dict]:
"""
Merge multiple ranked lists, uses RRF (Reciprocal Rank Fusion)
RRF score = sum over all lists of: 1 / (k + rank)
Why RRF over weighted combination?
- No score normalization needed (works on ranks, not raw scores)
- No alpha tuning needed
- Robust across different score distributions
- Used by Elasticsearch, Pinecone, Weaviate under the hood
"""
rrf_scores: dict[str, float] = defaultdict(float)
doc_details: dict[str, dict] = {}
for list_idx, ranked_list in enumerate(ranked_lists):
for rank, (doc_id, raw_score) in enumerate(ranked_list, start=1):
rrf_scores[doc_id] += 1.0 / (k + rank)
if doc_id not in doc_details:
doc_details[doc_id] = {}
doc_details[doc_id][f"list_{list_idx}_rank"] = rank
doc_details[doc_id][f"list_{list_idx}_score"] = raw_score
results = []
for doc_id, rrf_score in rrf_scores.items():
results.append({
"id": doc_id,
"rrf_score": round(rrf_score, 6),
**doc_details[doc_id],
})
results.sort(key=lambda x: x["rrf_score"], reverse=True)
return results[:top_n]
def hybrid_search_rrf(
query: str,
query_embedding: list[float],
bm25_index: BestMatching25Index,
vector_index: VectorIndex,
top_k: int = 10,
) -> list[dict]:
keyword_results = bm25_index.search(query, top_k=top_k * 2)
vector_results = vector_index.search(query_embedding, top_k=top_k * 2)
return reciprocal_rank_fusion(keyword_results, vector_results, top_n=top_k)Một trong những cách retrieval hiệu quả nhất hiện nay là hybrid retrieval — kết hợp giữa keyword search và semantic vector search. Keyword search mạnh ở các truy vấn chính xác như “password policy”, trong khi embedding search lại giỏi hơn ở việc hiểu ý nghĩa và ngữ cảnh câu hỏi.
Khi kết hợp cả hai, hệ thống sẽ vừa chính xác vừa linh hoạt hơn rất nhiều. Các framework như LlamaIndex và LangChain hiện là lựa chọn phổ biến để xây dựng pipeline retrieval theo hướng này.
5. Knowledge base phải được cập nhật liên tục
Một knowledge base tốt không phải thứ “xây xong rồi để đó”.
Theo thời gian, dữ liệu có thể lỗi thời, chính sách thay đổi hoặc embedding model được cập nhật. Nếu không refresh định kỳ, AI sẽ bắt đầu đưa ra các phản hồi không còn chính xác.
Có một khái niệm gọi là selective forgetting — tức chủ động xóa hoặc cập nhật những dữ liệu không còn phù hợp. Các công cụ như DeepEval hoặc TruLens có thể giúp theo dõi chất lượng retrieval và xác định chunk nào đang gây ra câu trả lời sai.
"""
Knowledge Base Quality Monitoring
Knowledge base health with the help of automated checks:
1. Retrieval quality — is it finding the right documents?
2. Freshness detection — Are documents stale or embeddings drifting?
3. Unified pipeline — Scheduled monitoring with alerts
"""
import time
import json
import logging
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from typing import Any, Callable
import numpy as np
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("kb_monitor")
def setup_deepeval_metrics():
"""Define retrieval quality metrics using DeepEval.
DeepEval provides LLM-evaluated metrics — it uses a judge LLM to score
whether retrieved context actually helps answer the question.
"""
from deepeval.metrics import (
AnswerRelevancyMetric,
FaithfulnessMetric,
ContextualPrecisionMetric,
ContextualRecallMetric,
)
from deepeval.test_case import LLMTestCase
metrics = {
# Does the answer address the question?
"relevancy": AnswerRelevancyMetric(threshold=0.7),
# Is the answer grounded in the retrieved context (no hallucination)?
"faithfulness": FaithfulnessMetric(threshold=0.7),
# Are the top-ranked retrieved docs actually relevant?
"context_precision": ContextualPrecisionMetric(threshold=0.7),
# Did we retrieve all the docs needed to answer?
"context_recall": ContextualRecallMetric(threshold=0.7),
}
return metrics, LLMTestCase
def evaluate_retrieval_quality(
rag_pipeline: Callable,
test_cases: list[dict],
) -> list[dict]:
"""Run a set of test queries through your RAG pipeline and score them.
Each test case should have:
- query: the user question
- expected_answer: ground truth answer (for recall/relevancy)
"""
from deepeval import evaluate
from deepeval.test_case import LLMTestCase
from deepeval.metrics import (
AnswerRelevancyMetric,
FaithfulnessMetric,
ContextualPrecisionMetric,
ContextualRecallMetric,
)
results = []
for tc in test_cases:
# Run your actual RAG pipeline
response = rag_pipeline(tc["query"])
test_case = LLMTestCase(
input=tc["query"],
actual_output=response["answer"],
expected_output=tc["expected_answer"],
retrieval_context=response["retrieved_contexts"],
)
metrics = [
AnswerRelevancyMetric(threshold=0.7),
FaithfulnessMetric(threshold=0.7),
ContextualPrecisionMetric(threshold=0.7),
ContextualRecallMetric(threshold=0.7),
]
for metric in metrics:
metric.measure(test_case)
results.append({
"query": tc["query"],
"scores": {m.__class__.__name__: m.score for m in metrics},
"passed": all(m.is_successful() for m in metrics),
})
return results
def setup_trulens_monitoring(rag_pipeline: Callable, app_name: str = "my_kb"):
"""Wrap your RAG pipeline with TruLens for continuous feedback logging.
TruLens records every query + response + retrieved context, then
runs feedback functions asynchronously to score each interaction.
"""
from trulens.core import TruSession, Feedback, Select
from trulens.providers.openai import OpenAI as TruLensOpenAI
from trulens.apps.custom import TruCustomApp, instrument
session = TruSession()
# Feedback provider (uses an LLM to judge quality)
provider = TruLensOpenAI()
feedbacks = [
# Is the response relevant to the query?
Feedback(provider.relevance)
.on_input()
.on_output(),
# Is the response grounded in retrieved context?
Feedback(provider.groundedness_measure_with_cot_reasons)
.on(Select.RecordCalls.retrieve.rets)
.on_output(),
# Is the retrieved context relevant to the query?
Feedback(provider.context_relevance)
.on_input()
.on(Select.RecordCalls.retrieve.rets),
]
# Wrap your pipeline — every call is now logged and scored
@instrument
class InstrumentedRAG:
def __init__(self, pipeline):
self._pipeline = pipeline
@instrument
def retrieve(self, query: str) -> list[str]:
result = self._pipeline(query)
return result["retrieved_contexts"]
@instrument
def query(self, query: str) -> str:
result = self._pipeline(query)
return result["answer"]
instrumented = InstrumentedRAG(rag_pipeline)
tru_app = TruCustomApp(
instrumented,
app_name=app_name,
feedbacks=feedbacks,
)
return tru_app, session
def get_trulens_dashboard_url(session) -> str:
"""Launch the TruLens dashboard to visualize quality over time."""
session.run_dashboard(port=8501)
return "http://localhost:8501"
@dataclass
class DocumentFreshness:
doc_id: str
last_updated: datetime
last_embedded: datetime
source_hash: str # hash of source content at embedding time
class FreshnessMonitor:
"""Detect stale documents and embedding drift."""
def __init__(self, staleness_threshold_days: int = 30):
self.threshold = timedelta(days=staleness_threshold_days)
self.freshness_records: dict[str, DocumentFreshness] = {}
def register(self, doc_id: str, source_hash: str) -> None:
now = datetime.utcnow()
self.freshness_records[doc_id] = DocumentFreshness(
doc_id=doc_id,
last_updated=now,
last_embedded=now,
source_hash=source_hash,
)
def check_staleness(self) -> dict:
"""Find documents that haven't been re-embedded recently."""
now = datetime.utcnow()
stale, fresh = [], []
for doc_id, record in self.freshness_records.items():
age = now - record.last_embedded
if age > self.threshold:
stale.append({"id": doc_id, "days_stale": age.days})
else:
fresh.append(doc_id)
return {
"total": len(self.freshness_records),
"fresh": len(fresh),
"stale": len(stale),
"stale_documents": stale,
}
def check_content_drift(
self, doc_id: str, current_source_hash: str
) -> bool:
"""Check if source content changed since last embedding."""
record = self.freshness_records.get(doc_id)
if not record:
return True # unknown doc, treat as drifted
return record.source_hash != current_source_hash
def detect_embedding_drift(
old_embeddings: dict[str, list[float]],
new_embeddings: dict[str, list[float]],
drift_threshold: float = 0.1,
) -> dict:
"""Compare old vs new embeddings for the same documents.
If your embedding model gets updated (or you switch models),
existing vectors may no longer be compatible. This detects that.
"""
drifted = []
common_ids = set(old_embeddings) & set(new_embeddings)
for doc_id in common_ids:
old = np.array(old_embeddings[doc_id])
new = np.array(new_embeddings[doc_id])
# cosine distance: 0 = identical, 2 = opposite
cos_sim = np.dot(old, new) / (np.linalg.norm(old) * np.linalg.norm(new))
cos_dist = 1 - cos_sim
if cos_dist > drift_threshold:
drifted.append({
"id": doc_id,
"cosine_distance": round(float(cos_dist), 4),
})
return {
"documents_compared": len(common_ids),
"drifted": len(drifted),
"drift_threshold": drift_threshold,
"drifted_documents": sorted(drifted, key=lambda x: x["cosine_distance"], reverse=True),
}6. Ba vấn đề lớn nhất khi xây knowledge base
Vấn đề phổ biến nhất là dữ liệu chất lượng kém. Đây cũng là nguyên nhân khiến AI hallucinate. Ví dụ nổi tiếng là chatbot của Air Canada từng tự “bịa” ra chính sách hoàn tiền không tồn tại.
Một vấn đề khác là retrieval chậm. Nhiều hệ thống AI trả lời đúng nhưng quá lag vì developer chưa tối ưu index hoặc vector storage. Tác giả khuyến nghị nên dùng HNSW hoặc IVF index thay vì flat index để tăng tốc truy xuất.
Ngoài ra, scalability cũng là bài toán lớn. Nhiều đội ngũ ban đầu chọn monolithic architecture để triển khai nhanh, nhưng khi lượng truy vấn tăng mạnh thì CPU và RAM bị quá tải. Theo tác giả, horizontal sharding là hướng phù hợp hơn để scale knowledge base trong dài hạn.
7. Knowledge base không phải nơi “dump dữ liệu”
Cuối cùng, cần lưu ý rằng knowledge base không phải nơi ném toàn bộ dữ liệu vào rồi hy vọng AI tự hiểu mọi thứ. Nó là một tài sản cần được curate và tối ưu liên tục.
Bạn nên bắt đầu từ những tác vụ nhỏ, chẳng hạn chỉ tập trung vào 10 câu hỏi phổ biến nhất trước. Sau khi AI trả lời ổn định và chính xác, mới tiếp tục mở rộng hệ thống. Khác biệt giữa một AI “đoán mò” và một AI “thực sự biết” nằm ở chính quá trình curate dữ liệu có chủ đích này.
Hướng dẫn AI
Học IT










Hàm Excel
Download