In previous posts, we defined our architecture and selected our technology stack. Now is the time to build the “Brain” of the operation. This post covers the deployment of the core AI services on our GPU-enabled virtual machine.
We will configure two main components:
- vLLM Service: The high-performance inference engine serving the Qwen2-VL model.
- Unified AI Middleware: A Python/FastAPI service that bridges the gap between our Java application and the raw AI model, handling logic, retraining, and “Reset” capabilities.
1. Initial Server Setup
First, we launch our VM based on the custom AI image we built earlier. We verify networking and prepare the directory structure for logs and scripts.
# Verify Hostname and IP
hostname
# Output: ai.datachronicles.net
hostname -i
# Output: 10.0.1.165 (Ensure this matches your Java App's config)
# Create the application directory
mkdir /opt/ai
# Create the shared log file
touch /var/log/ai.log
2. Directory Strategy & Logic (Crucial)
The system relies on a strict folder structure on the shared NFS mount (/po) to manage the data lifecycle.
The Workflow:
/po/incoming: Where the Java app drops new raw PDFs for the AI to read./po/review: Where processed files sit while waiting for human action./po/batch: The Archive. Once a human reviews a file, it is moved here for permanent storage./po/training: The Active Staging Area.- The Logic: When “Retrain” is clicked, the Java application clears this folder completely.
- The Select: It then scans
/po/batchand copies only the files marked as"modified": true(actual human corrections) into/po/training. - The Result: The AI script trains strictly on this clean, high-value dataset.
3. The Inference Engine (vLLM)
We use vLLM because it is significantly faster than standard Hugging Face Transformers. We run it as a background service that mimics the OpenAI API.
Create the service file: /etc/systemd/system/vllm.service
[Unit]
Description=vLLM Inference Service (Qwen2-VL)
After=network-online.target
Wants=network-online.target
[Service]
User=root
Group=root
WorkingDirectory=/opt/ai
Environment="PYTHONUNBUFFERED=1"
Environment="TERM=dumb"
Environment="NO_COLOR=1"
# The Startup Command
# We run vLLM as an OpenAI-compatible API server on Port 8001
ExecStart=/usr/bin/python3 -m vllm.entrypoints.openai.api_server \
--model /opt/models/Qwen2-VL-7B-Instruct \
--served-model-name Qwen/Qwen2-VL-7B-Instruct \
--port 8001 \
--gpu-memory-utilization 0.95 \
--max-model-len 8192 \
--trust-remote-code
# --- LOGGING REDIRECTION ---
# Appends vLLM logs to the SAME file as the Python API for unified debugging
StandardOutput=append:/var/log/ai.log
StandardError=append:/var/log/ai.log
Restart=always
RestartSec=10
LimitNOFILE=65535
[Install]
WantedBy=multi-user.target
4. Log Rotation
AI services generate verbose logs. We use logrotate to prevent the disk from filling up, ensuring old logs are compressed and rotated daily.
Create the config: /etc/logrotate.d/ai
/var/log/ai.log {
daily
missingok
rotate 7
compress
delaycompress
notifempty
copytruncate
}
5. The Retraining Script
This script executes the “Self-Healing” process.
How it works with the Folder Logic: Since the Java orchestrator has already filtered the relevant files into /po/training, this script simply loads everything it finds in that folder (BATCH_DIR). It does not need to filter logic itself; it assumes every file present is a valid “Lesson” to be learned.
Key Features:
- Dynamic Batching: It counts the files in the staging folder and automatically adjusts the gradient accumulation steps.
- Targeted Learning: It reads from
/po/training, ensuring it only learns from the specific batch prepared by the orchestrator.
Create the file: /opt/ai/training.py
#!/usr/bin/env python3
import os
import glob
import sys
import gc
import shutil
import torch
import subprocess
import time
from datetime import datetime
from pdf2image import convert_from_path
from transformers import (
Qwen2VLForConditionalGeneration,
AutoProcessor,
TrainingArguments,
Trainer,
BitsAndBytesConfig
)
from peft import (
LoraConfig,
get_peft_model,
TaskType,
PeftModel,
prepare_model_for_kbit_training
)
# ==========================================
# CONFIGURATION
# ==========================================
# The script looks here. Java ensures this folder ONLY contains "modified=true" files.
BATCH_DIR = "/po/training"
LIVE_MODEL_DIR = "/opt/models/Qwen2-VL-7B-Instruct"
ADAPTER_TEMP_DIR = "/opt/models/adapter_temp"
MERGED_TEMP_DIR = "/opt/models/merged_temp"
SERVICE_NAME = "vllm"
LOG_FILE = "/var/log/ai"
TRAINING_PROMPT = """
You are a specialized AI agent for extracting Purchase Order (PO) data.
Your task is to analyze the provided image and extract data into the following strict JSON structure:
{
"header": {
"po_number": "string or null",
"po_date": "YYYY-MM-DD (normalize all dates to this format)",
"vendor_name": "string or null",
"buyer_name": "string or null"
},
"line_items": [
{
"description": "string",
"quantity": number,
"unit_price": number,
"line_total": number
}
],
"summary": {
"subtotal": number,
"tax_amount": number,
"grand_total": number (if missing, calculate sum of line_items)
}
}
CRITICAL RULES:
1. **Date Normalization**: Convert formats like "30-Dec-2025" to ISO 8601 "2025-12-30".
2. **Missing Calculations**: If 'grand_total' is not visible, calculate it by summing the line items.
3. **Null Handling**: If a field is not present, set it to null.
4. **Output Format**: Return ONLY the raw JSON.
"""
# ==========================================
# HELPER FUNCTIONS
# ==========================================
def log(msg):
"""Prints with timestamp (Output redirected to file below)"""
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} [TRAINING] {msg}", flush=True)
def manage_service(action):
log(f"SYSTEMCTL: {action.upper()} {SERVICE_NAME}")
try:
# sudo is required for systemctl
subprocess.run(["sudo", "systemctl", action, SERVICE_NAME], check=True)
if action == "start":
log("Waiting 60s for service to stabilize...")
time.sleep(60)
except subprocess.CalledProcessError as e:
log(f"Error managing service: {e}")
# ==========================================
# DATA LOADING
# ==========================================
def load_dataset():
data = []
pdfs = glob.glob(os.path.join(BATCH_DIR, "*.pdf"))
# 1. List all detected files immediately
if len(pdfs) > 0:
log(f"--- DETECTED {len(pdfs)} PDF FILES IN {BATCH_DIR} ---")
for p in pdfs:
log(f" > {os.path.basename(p)}")
else:
log(f"No PDFs found in {BATCH_DIR}")
return [] # Quick exit
# 2. Validate Pairs
valid_count = 0
for pdf in pdfs:
json_file = pdf.replace(".pdf", ".json")
if os.path.exists(json_file):
with open(json_file, 'r') as f:
target_json = f.read()
data.append({"image_path": pdf, "text_output": target_json})
valid_count += 1
else:
log(f" [SKIP] {os.path.basename(pdf)} (Missing matching .json file)")
log(f"--- STARTING TRAINING ON {valid_count} VALID PAIRS ---")
return data
class QwenDataCollator:
def __init__(self, processor):
self.processor = processor
def __call__(self, batch):
texts, images = [], []
for item in batch:
try:
# SAFE GUARD: Check if key exists before using it
if 'image_path' not in item:
raise ValueError(f"Missing 'image_path' in batch item. Available keys: {list(item.keys())}")
# Convert PDF to Image
pil_images = convert_from_path(item['image_path'])
if len(pil_images) > 0:
images.append(pil_images[0])
else:
# Use .get() to be safe
fname = os.path.basename(item.get('image_path', 'UNKNOWN_FILE'))
log(f"WARNING: PDF {fname} yielded 0 images.")
continue
except Exception as e:
# CHANGED: Use .get() here too so the logger itself doesn't crash
fname = os.path.basename(item.get('image_path', 'UNKNOWN_FILE'))
log(f"ERROR processing {fname}: {str(e)}")
continue
messages = [
{
"role": "user",
"content": [
{"type": "image", "image": item['image_path']},
{"type": "text", "text": TRAINING_PROMPT}
]
},
{
"role": "assistant",
"content": [{"type": "text", "text": item['text_output']}]
}
]
text = self.processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=False)
texts.append(text)
if len(images) == 0:
raise ValueError("Batch processing failed: No valid images could be loaded.")
batch_out = self.processor(text=texts, images=images, padding=True, return_tensors="pt")
batch_out["labels"] = batch_out["input_ids"].clone()
return batch_out
# ==========================================
# TRAIN AND UPDATE LOGIC
# ==========================================
def train_and_update():
log("=== STARTING BATCH RETRAIN SEQUENCE ===")
# 1. Stop Service
manage_service("stop")
# 2. Validate Data
dataset = load_dataset()
if not dataset:
log("CRITICAL: No valid training data found. Restarting service and aborting.")
manage_service("start")
sys.exit(0)
# --- NEW: DYNAMIC BATCH CONFIGURATION ---
# This is the logic that prevents Overfitting vs Forgetting
num_files = len(dataset)
# We set gradient_accumulation to match the number of files.
# This forces the model to look at EVERY file in the batch before updating its brain.
dynamic_accumulation = num_files
# Cap it for safety (e.g., if you have 100 files, we don't want to OOM)
if dynamic_accumulation > 32:
dynamic_accumulation = 32
log(f"Dynamic Configuration: Accumulating gradients over {dynamic_accumulation} steps.")
try:
# Load Processor
processor = AutoProcessor.from_pretrained(
LIVE_MODEL_DIR,
min_pixels=256*28*28,
max_pixels=1280*28*28,
use_fast=False
)
# Load Model in 4-bit (QLoRA)
bnb_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_quant_type="nf4",
bnb_4bit_compute_dtype=torch.bfloat16
)
model = Qwen2VLForConditionalGeneration.from_pretrained(
LIVE_MODEL_DIR,
quantization_config=bnb_config,
device_map="auto",
)
model = prepare_model_for_kbit_training(model)
except Exception as e:
log(f"Failed to load live model: {e}")
manage_service("start")
sys.exit(1)
# 3. Setup LoRA
peft_config = LoraConfig(
task_type="CAUSAL_LM",
inference_mode=False,
r=64,
lora_alpha=128,
lora_dropout=0.1,
target_modules=["q_proj", "k_proj", "v_proj", "o_proj", "gate_proj", "up_proj", "down_proj"]
)
model = get_peft_model(model, peft_config)
model.print_trainable_parameters()
# 4. Train (High Accuracy Settings)
args = TrainingArguments(
output_dir=ADAPTER_TEMP_DIR,
num_train_epochs=20, # FIXED: Consistent repetition regardless of batch size
learning_rate=5e-5, # LOWERED: From 1e-3 to 5e-5 to prevent "memorization"
per_device_train_batch_size=1,
gradient_accumulation_steps=dynamic_accumulation, # DYNAMIC: Average error across the whole batch
save_strategy="no",
fp16=False, bf16=True,
report_to="none",
remove_unused_columns=False,
optim="paged_adamw_8bit"
)
trainer = Trainer(
model=model, args=args, train_dataset=dataset,
data_collator=QwenDataCollator(processor)
)
trainer.train()
log(f"Saving Adapter to {ADAPTER_TEMP_DIR}...")
trainer.model.save_pretrained(ADAPTER_TEMP_DIR)
# 5. Cleanup Memory
log("Cleaning GPU Memory for merge...")
del model
del trainer
gc.collect()
torch.cuda.empty_cache()
# 6. Merge Adapter into Base
log("Merging Adapter into Base Model...")
base_model = Qwen2VLForConditionalGeneration.from_pretrained(
LIVE_MODEL_DIR,
torch_dtype=torch.bfloat16,
device_map="auto"
)
model_to_merge = PeftModel.from_pretrained(base_model, ADAPTER_TEMP_DIR)
model_to_merge = model_to_merge.merge_and_unload()
# 7. Swap Folders
log("Swapping Live Model Folders...")
if os.path.exists(MERGED_TEMP_DIR):
shutil.rmtree(MERGED_TEMP_DIR)
# Save new model
model_to_merge.save_pretrained(MERGED_TEMP_DIR)
processor.save_pretrained(MERGED_TEMP_DIR)
# Overwrite Live Directory
shutil.rmtree(LIVE_MODEL_DIR)
shutil.move(MERGED_TEMP_DIR, LIVE_MODEL_DIR)
# Cleanup Adapter Temp
if os.path.exists(ADAPTER_TEMP_DIR):
shutil.rmtree(ADAPTER_TEMP_DIR)
log("SUCCESS: Model Updated. Restarting vLLM Service.")
# 8. Restart Service
manage_service("start")
if __name__ == "__main__":
# --- LOGGING REDIRECTION ---
try:
log_f = open(LOG_FILE, "a", buffering=1)
sys.stdout = log_f
sys.stderr = log_f
except Exception as e:
print(f"Failed to open log file {LOG_FILE}: {e}")
train_and_update()
6. The Unified AI Service
This FastAPI application is the main entry point for the Java Application. It unifies three responsibilities:
- Inference: Receives files, processes them via vLLM, and returns JSON.
- Retraining Trigger: Accepts a command to launch the
training.pyscript. - Reset Capability: Allows “Time Travel” demonstrations by restoring the model from backup.
Create the file: /opt/ai/ai.py
import os
import base64
import io
import json
import re
import time
import math
import logging
import subprocess
import shutil
from typing import List, Optional, Any, Dict, Tuple
from datetime import datetime
# Third-party imports
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel, Field, field_validator
from pdf2image import convert_from_path
from openai import AsyncOpenAI
# ==========================================
# CONFIGURATION
# ==========================================
# Paths
INCOMING_PATH = "/po/incoming/"
BATCH_SCRIPT = "/opt/ai/training.py"
LOG_FILE = "/var/log/ai.log"
# Models & API
VLLM_API_URL = "http://localhost:8001/v1"
MODEL_NAME = "Qwen/Qwen2-VL-7B-Instruct"
# Prompts
SYSTEM_PROMPT = """
You are a specialized AI agent for extracting Purchase Order (PO) data.
Your task is to analyze the provided image and extract data into the following strict JSON structure:
{
"header": {
"po_number": "string or null",
"po_date": "YYYY-MM-DD (normalize all dates to this format)",
"vendor_name": "string or null",
"buyer_name": "string or null"
},
"line_items": [
{
"description": "string",
"quantity": number,
"unit_price": number,
"line_total": number
}
],
"summary": {
"subtotal": number,
"tax_amount": number,
"grand_total": number (if missing, calculate sum of line_items)
}
}
CRITICAL RULES:
1. **Date Normalization**: Convert formats like "30-Dec-2025" to ISO 8601 "2025-12-30".
2. **Missing Calculations**: If 'grand_total' is not visible, calculate it by summing the line items.
3. **Null Handling**: If a field is not present, set it to null.
4. **Output Format**: Return ONLY the raw JSON.
"""
# ==========================================
# LOGGING SETUP
# ==========================================
logging.basicConfig(
filename=LOG_FILE,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - [%(name)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger("AI-Service")
# ==========================================
# DATA MODELS
# ==========================================
def safe_float(v: Any) -> Optional[float]:
if v is None:
return None
try:
clean_v = str(v).replace("$", "").replace(",", "").strip()
return float(clean_v)
except ValueError:
return None
class POHeader(BaseModel):
po_number: Optional[str] = None
po_date: Optional[str] = None
vendor_name: Optional[str] = None
buyer_name: Optional[str] = None
class LineItem(BaseModel):
description: Optional[str] = None
quantity: Optional[float] = None
unit_price: Optional[float] = None
line_total: Optional[float] = None
@field_validator('quantity', 'unit_price', 'line_total', mode='before')
@classmethod
def clean_numbers(cls, v):
return safe_float(v)
class POSummary(BaseModel):
subtotal: Optional[float] = None
tax_amount: Optional[float] = None
grand_total: Optional[float] = None
@field_validator('subtotal', 'tax_amount', 'grand_total', mode='before')
@classmethod
def clean_numbers(cls, v):
return safe_float(v)
class PurchaseOrder(BaseModel):
header: POHeader
line_items: List[LineItem] = []
summary: POSummary
def check_completeness_status(self) -> dict:
missing_fields = []
if not self.header.po_number: missing_fields.append("header.po_number")
if self.summary.grand_total is None: missing_fields.append("summary.grand_total")
if not self.line_items: missing_fields.append("line_items")
if missing_fields:
return {
"status": "REVIEW_REQUIRED",
"validation_errors": [f"Missing critical field: {f}" for f in missing_fields]
}
return {"status": "VALID", "validation_errors": []}
class ExtractionRequest(BaseModel):
filename: str
# ==========================================
# CORE LOGIC (INFERENCE)
# ==========================================
app = FastAPI(title="AI-Assisted PO Extraction Service")
def load_and_convert_pdf(filename: str) -> List[str]:
file_path = os.path.join(INCOMING_PATH, filename)
if not os.path.exists(file_path):
logger.error(f"File not found: {file_path}")
raise HTTPException(status_code=404, detail=f"File not found at {file_path}")
try:
images = convert_from_path(file_path)
encoded_images = []
for img in images:
img.thumbnail((1024, 1024))
buffered = io.BytesIO()
img.save(buffered, format="JPEG")
img_str = base64.b64encode(buffered.getvalue()).decode("utf-8")
encoded_images.append(f"data:image/jpeg;base64,{img_str}")
logger.info(f"Successfully converted PDF: {filename} ({len(encoded_images)} pages)")
return encoded_images
except Exception as e:
logger.error(f"PDF Conversion failed for {filename}: {e}")
raise HTTPException(status_code=500, detail=f"PDF Error: {str(e)}")
async def call_vllm_model(base64_images: List[str]) -> Tuple[Dict[str, Any], float]:
"""Sends image to local vLLM server and calculates confidence."""
client = AsyncOpenAI(base_url=VLLM_API_URL, api_key="EMPTY")
messages = [{
"role": "user",
"content": [
{"type": "text", "text": SYSTEM_PROMPT},
{"type": "image_url", "image_url": {"url": base64_images[0]}}
],
}]
try:
logger.info("Sending request to vLLM model...")
response = await client.chat.completions.create(
model=MODEL_NAME,
messages=messages,
max_tokens=2048,
temperature=0.1,
logprobs=True,
top_logprobs=1
)
raw_content = response.choices[0].message.content
# Calculate Confidence Score
confidence = 0.0
if response.choices[0].logprobs and response.choices[0].logprobs.content:
logprobs = [token.logprob for token in response.choices[0].logprobs.content]
if logprobs:
avg_logprob = sum(logprobs) / len(logprobs)
confidence = math.exp(avg_logprob)
# Cleanup JSON
match = re.search(r'\{.*\}', raw_content, re.DOTALL)
if match:
return json.loads(match.group()), confidence
return json.loads(raw_content), confidence
except Exception as e:
logger.error(f"Inference failed: {e}")
return {"error": str(e)}, 0.0
# ==========================================
# CORE LOGIC (RETRAINING)
# ==========================================
def run_batch_process():
"""Executes the batch training script in a separate process"""
try:
logger.info(f"Launching worker: {BATCH_SCRIPT}")
# Open log file in append mode to capture stdout/stderr from the subprocess
with open(LOG_FILE, "a") as outfile:
subprocess.run(
["/usr/bin/python3", BATCH_SCRIPT],
stdout=outfile,
stderr=outfile,
text=True
)
logger.info("Worker process finished.")
except Exception as e:
logger.error(f"Failed to launch worker: {e}")
def run_reset_script():
"""
1. Stop vLLM
2. Wipe the 'live' model
3. Restore from 'base' backup
4. Start vLLM
"""
try:
# We write to the log manually here since we are inside a background task
# and want strict sequencing.
with open(LOG_FILE, "a") as log_f:
log_f.write(f"\n[{datetime.now()}] RESET: Starting System Reset...\n")
# 1. Stop Service
log_f.write("RESET: Stopping vLLM Service...\n")
subprocess.run(["systemctl", "stop", "vllm"], stdout=log_f, stderr=log_f)
# 2. Delete Live Model
live_model_dir = "/opt/models/Qwen2-VL-7B-Instruct"
log_f.write(f"RESET: Wiping {live_model_dir}...\n")
if os.path.exists(live_model_dir):
shutil.rmtree(live_model_dir)
os.makedirs(live_model_dir, exist_ok=True)
# 3. Copy Base Model (Backup) -> Live
backup_dir = "/opt/models/base/Qwen2-VL-7B-Instruct"
log_f.write(f"RESET: Restoring from {backup_dir}...\n")
subprocess.run(f"cp -r {backup_dir}/* {live_model_dir}/", shell=True, stdout=log_f, stderr=log_f)
# 4. Start Service
log_f.write("RESET: Starting vLLM Service...\n")
subprocess.run(["systemctl", "start", "vllm"], stdout=log_f, stderr=log_f)
log_f.write(f"[{datetime.now()}] RESET: Complete.\n")
except Exception as e:
logger.error(f"RESET FAILED: {str(e)}")
# ==========================================
# API ENDPOINTS
# ==========================================
@app.get("/health")
def health_check():
return {"status": "active", "service": "Unified AI Manager"}
@app.post("/extract")
async def extract_po_data(request: ExtractionRequest):
logger.info(f"Received extraction request for: {request.filename}")
start_time = time.time()
# Process
try:
base64_images = load_and_convert_pdf(request.filename)
raw_result, confidence = await call_vllm_model(base64_images)
except Exception as e:
logger.error(f"Pipeline failed for {request.filename}: {e}")
raise HTTPException(status_code=500, detail=str(e))
end_time = time.time()
processing_duration = round(end_time - start_time, 2)
if "error" in raw_result:
logger.error(f"Model returned error for {request.filename}: {raw_result['error']}")
return {
"status": "SYSTEM_ERROR",
"data": None,
"error": raw_result["error"],
"processing_time_seconds": processing_duration,
"confidence_score": 0.0
}
# Validate
try:
po_obj = PurchaseOrder(**raw_result)
quality_check = po_obj.check_completeness_status()
logger.info(f"Success: {request.filename} | Status: {quality_check['status']} | Conf: {confidence:.2f}")
return {
"filename": request.filename,
"status": quality_check["status"],
"validation_errors": quality_check["validation_errors"],
"data": po_obj.model_dump(),
"processing_time_seconds": processing_duration,
"confidence_score": round(confidence, 4)
}
except Exception as e:
logger.warning(f"Validation failed for {request.filename}: {e}")
return {
"filename": request.filename,
"status": "REVIEW_REQUIRED",
"validation_errors": [f"Structure Failure: {str(e)}"],
"data": raw_result,
"processing_time_seconds": processing_duration,
"confidence_score": round(confidence, 4)
}
@app.post("/start-retrain")
async def start_retrain_endpoint(background_tasks: BackgroundTasks):
"""
Endpoint called by Java to trigger the retraining batch process.
"""
if not os.path.exists(BATCH_SCRIPT):
logger.error(f"Script missing: {BATCH_SCRIPT}")
raise HTTPException(status_code=500, detail=f"Worker script not found at {BATCH_SCRIPT}")
logger.info("Received /start-retrain request. Queueing background task.")
background_tasks.add_task(run_batch_process)
return {
"status": "accepted",
"message": "Retraining queued. Check logs at /var/log/ai.log"
}
@app.post("/reset")
async def reset_poc(background_tasks: BackgroundTasks):
"""
Resets the PoC to the original 'Clean' model state.
"""
logger.info("Received /reset request. Triggering system reset.")
background_tasks.add_task(run_reset_script)
return {"status": "Reset triggered. vLLM is restarting (Wait ~60s)."}
if __name__ == "__main__":
import uvicorn
# Create log file if it doesn't exist
if not os.path.exists(LOG_FILE):
try:
open(LOG_FILE, 'a').close()
except PermissionError:
print(f"CRITICAL: Cannot write to {LOG_FILE}. Run as root or change permissions.")
# Run on Port 8000
uvicorn.run(app, host="0.0.0.0", port=8000)
7. Managing the AI Service with systemd
We wrap our Unified AI Middleware in a systemd unit.
Configuration Note: We use After=vllm.service to ensure our Python code only runs once the underlying model server is ready.
Create the service file: /etc/systemd/system/ai.service
[Unit]
Description=Unified AI Service (Inference & Retraining)
# We must wait for vLLM because the app checks connection on startup
After=network-online.target vllm.service
Wants=network-online.target
[Service]
User=root
Group=root
WorkingDirectory=/opt/ai/
Environment="PYTHONUNBUFFERED=1"
Environment="PYTHONPATH=/opt/ai"
# Run the merged script directly (It contains the uvicorn.run entry point)
ExecStart=/usr/bin/python3 /opt/ai/ai.py
# --- LOGGING ---
# Appending to the same log file used by the Python logger
StandardOutput=append:/var/log/ai.log
StandardError=append:/var/log/ai.log
# Restart Logic
Restart=always
RestartSec=5
[Install]
WantedBy=multi-user.target
8. Deployment
With all files in place, we reload the system daemon to recognize the new services and start them in order.
# Reload systemd to see the new files
systemctl daemon-reload
# Start the Inference Engine first
systemctl enable vllm.service
systemctl start vllm.service
# (Optional) Watch the logs to see it load the model
# tail -f /var/log/ai.log
# Once vLLM is stable, start the Middleware
systemctl enable ai.service
systemctl start ai.service