|
|
|
""" |
|
Validation script for the quantized ONNX LazarusNLP IndoBERT model. |
|
Checks model integrity, performance, and accuracy. |
|
""" |
|
|
|
import onnxruntime as ort |
|
from transformers import AutoTokenizer |
|
import numpy as np |
|
import json |
|
import os |
|
import time |
|
import sys |
|
|
|
def check_files(): |
|
"""Check if all required files are present.""" |
|
print("π Checking required files...") |
|
|
|
required_files = [ |
|
"model.onnx", |
|
"tokenizer.json", |
|
"tokenizer_config.json", |
|
"special_tokens_map.json", |
|
"vocab.txt", |
|
"config.json", |
|
"README.md" |
|
] |
|
|
|
missing_files = [] |
|
file_sizes = {} |
|
|
|
for file in required_files: |
|
if os.path.exists(file): |
|
file_sizes[file] = os.path.getsize(file) |
|
print(f"β
{file} ({file_sizes[file] / (1024*1024):.1f} MB)") |
|
else: |
|
missing_files.append(file) |
|
print(f"β {file} - MISSING") |
|
|
|
if missing_files: |
|
print(f"\nβ Missing files: {missing_files}") |
|
return False, {} |
|
|
|
print("β
All required files present") |
|
return True, file_sizes |
|
|
|
def check_model_loading(): |
|
"""Test model and tokenizer loading.""" |
|
print("\nπ Testing model loading...") |
|
|
|
try: |
|
|
|
start_time = time.time() |
|
tokenizer = AutoTokenizer.from_pretrained("./") |
|
tokenizer_time = time.time() - start_time |
|
print(f"β
Tokenizer loaded ({tokenizer_time:.3f}s)") |
|
|
|
|
|
start_time = time.time() |
|
session = ort.InferenceSession("model.onnx") |
|
model_time = time.time() - start_time |
|
print(f"β
ONNX model loaded ({model_time:.3f}s)") |
|
|
|
|
|
inputs = session.get_inputs() |
|
outputs = session.get_outputs() |
|
|
|
print(f"β
Model inputs: {[inp.name for inp in inputs]}") |
|
print(f"β
Model outputs: {[out.name for out in outputs]}") |
|
|
|
return True, session, tokenizer |
|
|
|
except Exception as e: |
|
print(f"β Model loading failed: {e}") |
|
return False, None, None |
|
|
|
def test_basic_inference(session, tokenizer): |
|
"""Test basic model inference.""" |
|
print("\nπ§ͺ Testing basic inference...") |
|
|
|
test_texts = [ |
|
"Halo", |
|
"Ini adalah tes sederhana.", |
|
"Teknologi AI berkembang pesat di Indonesia.", |
|
"Model machine learning membantu analisis data besar untuk memberikan insight yang berharga." |
|
] |
|
|
|
results = [] |
|
|
|
for i, text in enumerate(test_texts): |
|
try: |
|
|
|
inputs = tokenizer(text, return_tensors="np", padding=True, truncation=True) |
|
|
|
|
|
start_time = time.time() |
|
outputs = session.run(None, { |
|
'input_ids': inputs['input_ids'], |
|
'attention_mask': inputs['attention_mask'] |
|
}) |
|
inference_time = time.time() - start_time |
|
|
|
|
|
embeddings = outputs[0] |
|
token_count = inputs['input_ids'].shape[1] |
|
|
|
results.append({ |
|
'text': text, |
|
'tokens': token_count, |
|
'output_shape': embeddings.shape, |
|
'inference_time': inference_time, |
|
'has_nan': np.isnan(embeddings).any(), |
|
'has_inf': np.isinf(embeddings).any(), |
|
'output_range': [float(embeddings.min()), float(embeddings.max())] |
|
}) |
|
|
|
print(f"β
Test {i+1}: {token_count} tokens β {embeddings.shape} ({inference_time:.4f}s)") |
|
|
|
except Exception as e: |
|
print(f"β Test {i+1} failed: {e}") |
|
return False, [] |
|
|
|
return True, results |
|
|
|
def test_batch_processing(session, tokenizer): |
|
"""Test batch processing capability.""" |
|
print("\nπ¦ Testing batch processing...") |
|
|
|
batch_texts = [ |
|
"Kalimat pertama untuk tes batch.", |
|
"Ini adalah kalimat kedua yang sedikit lebih panjang.", |
|
"Kalimat ketiga dengan panjang yang berbeda lagi untuk menguji padding.", |
|
"Terakhir, kalimat keempat." |
|
] |
|
|
|
try: |
|
|
|
inputs = tokenizer(batch_texts, return_tensors="np", padding=True, truncation=True) |
|
|
|
start_time = time.time() |
|
outputs = session.run(None, { |
|
'input_ids': inputs['input_ids'], |
|
'attention_mask': inputs['attention_mask'] |
|
}) |
|
batch_time = time.time() - start_time |
|
|
|
embeddings = outputs[0] |
|
|
|
print(f"β
Batch shape: {embeddings.shape}") |
|
print(f"β
Batch time: {batch_time:.4f}s") |
|
print(f"β
Avg per item: {batch_time/len(batch_texts):.4f}s") |
|
|
|
|
|
for i in range(len(batch_texts)): |
|
item_embedding = embeddings[i] |
|
if np.isnan(item_embedding).any() or np.isinf(item_embedding).any(): |
|
print(f"β Batch item {i} has invalid values") |
|
return False |
|
|
|
print("β
All batch items valid") |
|
return True |
|
|
|
except Exception as e: |
|
print(f"β Batch processing failed: {e}") |
|
return False |
|
|
|
def test_edge_cases(session, tokenizer): |
|
"""Test edge cases and error handling.""" |
|
print("\nπ§ Testing edge cases...") |
|
|
|
edge_cases = [ |
|
("Empty string", ""), |
|
("Single character", "a"), |
|
("Numbers only", "123456789"), |
|
("Punctuation", "!!!???..."), |
|
("Mixed script", "Hello dunia 123 !@#"), |
|
("Very long", "Kata " * 100), |
|
("Special tokens", "[CLS] [SEP] [MASK] [PAD] [UNK]") |
|
] |
|
|
|
passed = 0 |
|
total = len(edge_cases) |
|
|
|
for name, text in edge_cases: |
|
try: |
|
inputs = tokenizer(text, return_tensors="np", padding=True, truncation=True) |
|
outputs = session.run(None, { |
|
'input_ids': inputs['input_ids'], |
|
'attention_mask': inputs['attention_mask'] |
|
}) |
|
|
|
embeddings = outputs[0] |
|
|
|
|
|
if embeddings.shape[0] == 1 and embeddings.shape[2] == 768: |
|
if not (np.isnan(embeddings).any() or np.isinf(embeddings).any()): |
|
print(f"β
{name}: {embeddings.shape}") |
|
passed += 1 |
|
else: |
|
print(f"β {name}: Invalid values (NaN/Inf)") |
|
else: |
|
print(f"β {name}: Wrong shape {embeddings.shape}") |
|
|
|
except Exception as e: |
|
print(f"β {name}: {e}") |
|
|
|
print(f"\nβ
Edge cases passed: {passed}/{total}") |
|
return passed == total |
|
|
|
def performance_benchmark(session, tokenizer): |
|
"""Run performance benchmark.""" |
|
print("\nβ‘ Performance benchmark...") |
|
|
|
|
|
test_cases = [ |
|
("Short (5 tokens)", "Halo dunia!"), |
|
("Medium (15 tokens)", "Teknologi AI berkembang sangat pesat di era digital modern."), |
|
("Long (50+ tokens)", " ".join(["Kalimat panjang dengan banyak kata untuk menguji performa model dalam memproses teks yang lebih kompleks dan detail."] * 2)) |
|
] |
|
|
|
benchmark_results = {} |
|
|
|
for name, text in test_cases: |
|
times = [] |
|
token_count = len(tokenizer.encode(text)) |
|
|
|
|
|
inputs = tokenizer(text, return_tensors="np", padding=True, truncation=True) |
|
session.run(None, { |
|
'input_ids': inputs['input_ids'], |
|
'attention_mask': inputs['attention_mask'] |
|
}) |
|
|
|
|
|
for _ in range(20): |
|
inputs = tokenizer(text, return_tensors="np", padding=True, truncation=True) |
|
|
|
start_time = time.time() |
|
outputs = session.run(None, { |
|
'input_ids': inputs['input_ids'], |
|
'attention_mask': inputs['attention_mask'] |
|
}) |
|
times.append(time.time() - start_time) |
|
|
|
avg_time = np.mean(times) |
|
std_time = np.std(times) |
|
tokens_per_sec = token_count / avg_time |
|
|
|
benchmark_results[name] = { |
|
'avg_time': avg_time, |
|
'std_time': std_time, |
|
'token_count': token_count, |
|
'tokens_per_sec': tokens_per_sec |
|
} |
|
|
|
print(f"β
{name}: {avg_time:.4f}s Β± {std_time:.4f}s ({tokens_per_sec:.1f} tokens/s)") |
|
|
|
return benchmark_results |
|
|
|
def check_config_consistency(): |
|
"""Check configuration file consistency.""" |
|
print("\nπ§ Checking configuration consistency...") |
|
|
|
try: |
|
|
|
with open("config.json", "r") as f: |
|
config = json.load(f) |
|
|
|
with open("tokenizer_config.json", "r") as f: |
|
tokenizer_config = json.load(f) |
|
|
|
with open("export_config.json", "r") as f: |
|
export_config = json.load(f) |
|
|
|
|
|
issues = [] |
|
|
|
|
|
model_max_pos = config.get("max_position_embeddings", 512) |
|
tokenizer_max = tokenizer_config.get("model_max_length", 512) |
|
|
|
if model_max_pos != tokenizer_max: |
|
issues.append(f"Max length mismatch: model={model_max_pos}, tokenizer={tokenizer_max}") |
|
|
|
|
|
unlimited = export_config.get("unlimited_length", False) |
|
dynamic_axes = export_config.get("dynamic_axes", False) |
|
|
|
if unlimited and not dynamic_axes: |
|
issues.append("Unlimited length enabled but dynamic_axes is False") |
|
|
|
|
|
if "quantization" not in config: |
|
issues.append("Missing quantization information in config") |
|
|
|
if issues: |
|
for issue in issues: |
|
print(f"β οΈ {issue}") |
|
else: |
|
print("β
All configurations consistent") |
|
|
|
return len(issues) == 0 |
|
|
|
except Exception as e: |
|
print(f"β Config check failed: {e}") |
|
return False |
|
|
|
def generate_validation_report(results): |
|
"""Generate validation report.""" |
|
print("\nπ VALIDATION REPORT") |
|
print("=" * 60) |
|
|
|
|
|
all_passed = all([ |
|
results.get('files_ok', False), |
|
results.get('loading_ok', False), |
|
results.get('inference_ok', False), |
|
results.get('batch_ok', False), |
|
results.get('edge_cases_ok', False), |
|
results.get('config_ok', False) |
|
]) |
|
|
|
status = "β
PASSED" if all_passed else "β FAILED" |
|
print(f"Overall Status: {status}") |
|
|
|
print(f"\nFile Check: {'β
PASSED' if results.get('files_ok') else 'β FAILED'}") |
|
print(f"Model Loading: {'β
PASSED' if results.get('loading_ok') else 'β FAILED'}") |
|
print(f"Basic Inference: {'β
PASSED' if results.get('inference_ok') else 'β FAILED'}") |
|
print(f"Batch Processing: {'β
PASSED' if results.get('batch_ok') else 'β FAILED'}") |
|
print(f"Edge Cases: {'β
PASSED' if results.get('edge_cases_ok') else 'β FAILED'}") |
|
print(f"Config Consistency: {'β
PASSED' if results.get('config_ok') else 'β FAILED'}") |
|
|
|
|
|
if 'benchmark' in results: |
|
print(f"\nβ‘ PERFORMANCE SUMMARY") |
|
for name, data in results['benchmark'].items(): |
|
print(f"{name}: {data['avg_time']:.4f}s ({data['tokens_per_sec']:.1f} tokens/s)") |
|
|
|
|
|
if 'file_sizes' in results: |
|
total_size = sum(results['file_sizes'].values()) / (1024*1024) |
|
print(f"\nπ Total model size: {total_size:.1f} MB") |
|
|
|
print("=" * 60) |
|
|
|
return all_passed |
|
|
|
def main(): |
|
"""Run complete model validation.""" |
|
print("π LazarusNLP IndoBERT ONNX - Model Validation") |
|
print("=" * 60) |
|
|
|
results = {} |
|
|
|
|
|
files_ok, file_sizes = check_files() |
|
results['files_ok'] = files_ok |
|
results['file_sizes'] = file_sizes |
|
|
|
if not files_ok: |
|
print("\nβ Validation failed: Missing required files") |
|
return False |
|
|
|
|
|
loading_ok, session, tokenizer = check_model_loading() |
|
results['loading_ok'] = loading_ok |
|
|
|
if not loading_ok: |
|
print("\nβ Validation failed: Model loading error") |
|
return False |
|
|
|
|
|
inference_ok, inference_results = test_basic_inference(session, tokenizer) |
|
results['inference_ok'] = inference_ok |
|
results['inference_results'] = inference_results |
|
|
|
|
|
batch_ok = test_batch_processing(session, tokenizer) |
|
results['batch_ok'] = batch_ok |
|
|
|
|
|
edge_cases_ok = test_edge_cases(session, tokenizer) |
|
results['edge_cases_ok'] = edge_cases_ok |
|
|
|
|
|
benchmark = performance_benchmark(session, tokenizer) |
|
results['benchmark'] = benchmark |
|
|
|
|
|
config_ok = check_config_consistency() |
|
results['config_ok'] = config_ok |
|
|
|
|
|
validation_passed = generate_validation_report(results) |
|
|
|
|
|
with open("validation_results.json", "w") as f: |
|
json.dump(results, f, indent=2, default=str) |
|
|
|
print(f"\nπΎ Validation results saved to validation_results.json") |
|
|
|
if validation_passed: |
|
print("π Model validation completed successfully!") |
|
return True |
|
else: |
|
print("β Model validation failed!") |
|
return False |
|
|
|
if __name__ == "__main__": |
|
success = main() |
|
sys.exit(0 if success else 1) |