react
"use client"
import React, { useRef, useEffect, useState } from 'react';
import 'xterm/css/xterm.css';
const TerminalComponent = () => {
const terminalRef = useRef<HTMLDivElement>(null);
const terminalInstanceRef = useRef<any>(null);
const socketRef = useRef<WebSocket | null>(null);
const [isConnected, setIsConnected] = useState(false);
const [dimensions, setDimensions] = useState({ cols: 80, rows: 24 });
const commandHistoryRef = useRef<string[]>([]);
const historyIndexRef = useRef<number>(-1);
const currentLineBufferRef = useRef<string>('');
useEffect(() => {
if (typeof window === 'undefined') return;
// Dynamic imports
const { Terminal } = require('xterm');
const { FitAddon } = require('xterm-addon-fit');
const { WebLinksAddon } = require('xterm-addon-web-links');
const { Unicode11Addon } = require('xterm-addon-unicode11');
const term = new Terminal({
convertEol: true,
cursorBlink: true,
fontSize: 14,
fontFamily: "'Fira Code', 'Courier New', monospace",
theme: {
background: '#1e1e1e',
foreground: '#f0f0f0',
cursor: '#f0f0f0',
selection: 'rgba(255, 255, 255, 0.3)',
},
allowProposedApi: true,
});
// Initialize addons
const fitAddon = new FitAddon();
const webLinksAddon = new WebLinksAddon();
const unicode11Addon = new Unicode11Addon();
term.loadAddon(fitAddon);
term.loadAddon(webLinksAddon);
term.loadAddon(unicode11Addon);
term.unicode.activeVersion = '11';
term.open(terminalRef.current!);
fitAddon.fit();
// Set initial dimensions
const { cols, rows } = term;
setDimensions({ cols, rows });
terminalInstanceRef.current = term;
// Create WebSocket connection with size parameters
const socket = new WebSocket(
`ws://${window.location.hostname}:8000/ws?rows=${rows}&cols=${cols}`
);
socketRef.current = socket;
socket.onopen = () => {
setIsConnected(true);
// Write initial prompt
term.write('\x1b[32m$\x1b[0m ');
};
socket.onmessage = (event) => {
term.write(event.data);
};
socket.onclose = () => {
setIsConnected(false);
term.write('\r\n\x1b[31mConnection closed. Refresh page to reconnect.\x1b[0m\r\n');
};
socket.onerror = (error) => {
console.error('WebSocket Error:', error);
term.write('\r\n\x1b[31mConnection error. Refresh page.\x1b[0m\r\n');
};
const handleResize = () => {
fitAddon.fit();
const { cols, rows } = term;
setDimensions({ cols, rows });
// Send new size to server
if (socket.readyState === WebSocket.OPEN) {
socket.send(`\x1b[8;${rows};${cols}t`);
}
};
const handleData = (data: string) => {
if (socket.readyState !== WebSocket.OPEN) {
term.write('\r\n\x1b[31mNot connected. Refresh page.\x1b[0m\r\n');
return;
}
const charCode = data.charCodeAt(0);
// Handle Ctrl+C
if (charCode === 3) {
socket.send('\x03');
currentLineBufferRef.current = '';
term.write('^C\r\n\x1b[32m$\x1b[0m ');
return;
}
// Handle Enter
if (charCode === 13) {
const currentLine = currentLineBufferRef.current;
if (currentLine.trim().length > 0) {
commandHistoryRef.current.push(currentLine);
historyIndexRef.current = commandHistoryRef.current.length;
socket.send(currentLine + '\n');
} else {
socket.send('\n');
}
currentLineBufferRef.current = '';
return;
}
// Handle Backspace
if (charCode === 127 || charCode === 8) {
if (currentLineBufferRef.current.length > 0) {
term.write('\b \b');
currentLineBufferRef.current = currentLineBufferRef.current.slice(0, -1);
}
return;
}
// Handle Arrow Keys
if (charCode === 27 && data.length >= 2 && data[1] === '[') {
const arrowKey = data[2];
if (arrowKey === 'A' || arrowKey === 'B') {
if (commandHistoryRef.current.length === 0) return;
// Clear current line
term.write('\x1b[2K\r\x1b[32m$\x1b[0m ');
if (arrowKey === 'A') {
if (historyIndexRef.current > 0) {
historyIndexRef.current--;
}
} else {
if (historyIndexRef.current < commandHistoryRef.current.length - 1) {
historyIndexRef.current++;
} else {
historyIndexRef.current = commandHistoryRef.current.length;
}
}
if (historyIndexRef.current < commandHistoryRef.current.length) {
currentLineBufferRef.current = commandHistoryRef.current[historyIndexRef.current];
term.write(currentLineBufferRef.current);
} else {
currentLineBufferRef.current = '';
}
}
return;
}
// Handle printable characters
if (charCode >= 32) {
currentLineBufferRef.current += data;
term.write(data);
}
};
term.onData(handleData);
window.addEventListener('resize', handleResize);
return () => {
window.removeEventListener('resize', handleResize);
term.dispose();
if (socket.readyState === WebSocket.OPEN) {
socket.close();
}
};
}, []);
return (
<div style={{
height: '100vh',
width: '100vw',
display: 'flex',
flexDirection: 'column',
backgroundColor: '#1e1e1e'
}}>
<div style={{
padding: '4px 8px',
backgroundColor: '#1e1e1e',
color: isConnected ? '#4CAF50' : '#F44336',
fontSize: '12px',
borderBottom: '1px solid #333',
display: 'flex',
justifyContent: 'space-between'
}}>
<span>Web Terminal ({dimensions.cols}x{dimensions.rows})</span>
<span>{isConnected ? 'â—Ź Connected' : 'â—‹ Disconnected'}</span>
</div>
<div
ref={terminalRef}
style={{
flex: 1,
padding: '8px',
overflow: 'hidden'
}}
/>
</div>
);
};
export default TerminalComponent;
ptyhon
# main.py
import os
import pty
import select
import asyncio
import signal
import fcntl
import termios
import struct
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class PTYManager:
def __init__(self):
self.process_pid = None
self.fd = None
async def handle_websocket(self, websocket: WebSocket):
await websocket.accept()
# Get terminal size from WebSocket query params
query_params = dict(websocket.query_params)
rows = int(query_params.get('rows', '24'))
cols = int(query_params.get('cols', '80'))
# Fork PTY with proper size
pid, fd = pty.fork()
if pid == 0: # Child process
# Set terminal environment
env = os.environ.copy()
env["TERM"] = "xterm-256color"
env["COLORTERM"] = "truecolor"
# Set window size
winsize = struct.pack("HHHH", rows, cols, 0, 0)
fcntl.ioctl(pty.STDIN_FILENO, termios.TIOCSWINSZ, winsize)
# Disable echo in the child process
attrs = termios.tcgetattr(pty.STDIN_FILENO)
attrs[3] = attrs[3] & ~termios.ECHO
termios.tcsetattr(pty.STDIN_FILENO, termios.TCSANOW, attrs)
os.execvpe("bash", ["bash"], env)
else: # Parent process
self.process_pid = pid
self.fd = fd
try:
while True:
r, _, _ = select.select([fd], [], [], 0.1)
if fd in r:
data = os.read(fd, 1024)
if not data:
break
await websocket.send_text(data.decode('utf-8', errors='replace'))
try:
data = await asyncio.wait_for(
websocket.receive_text(),
timeout=0.1
)
# Handle window resize
if data.startswith('\x1b[8;'):
try:
parts = data[2:-1].split(';')
rows = int(parts[1])
cols = int(parts[2])
winsize = struct.pack("HHHH", rows, cols, 0, 0)
fcntl.ioctl(fd, termios.TIOCSWINSZ, winsize)
except:
pass
else:
os.write(fd, data.encode())
except asyncio.TimeoutError:
continue
except (WebSocketDisconnect, OSError) as e:
print(f"Connection closed: {e}")
finally:
if fd:
os.close(fd)
if pid:
try:
os.kill(pid, signal.SIGTERM)
except ProcessLookupError:
pass
pty_manager = PTYManager()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await pty_manager.handle_websocket(websocket)
@app.get("/")
async def health_check():
return {"status": "ok"}