bolierplate-codes
Terminal

react

"use client"
import React, { useRef, useEffect, useState } from 'react';
import 'xterm/css/xterm.css';
 
const TerminalComponent = () => {
    const terminalRef = useRef<HTMLDivElement>(null);
    const terminalInstanceRef = useRef<any>(null);
    const socketRef = useRef<WebSocket | null>(null);
    const [isConnected, setIsConnected] = useState(false);
    const [dimensions, setDimensions] = useState({ cols: 80, rows: 24 });
    const commandHistoryRef = useRef<string[]>([]);
    const historyIndexRef = useRef<number>(-1);
    const currentLineBufferRef = useRef<string>('');
 
    useEffect(() => {
        if (typeof window === 'undefined') return;
        
        // Dynamic imports
        const { Terminal } = require('xterm');
        const { FitAddon } = require('xterm-addon-fit');
        const { WebLinksAddon } = require('xterm-addon-web-links');
        const { Unicode11Addon } = require('xterm-addon-unicode11');
 
        const term = new Terminal({
            convertEol: true,
            cursorBlink: true,
            fontSize: 14,
            fontFamily: "'Fira Code', 'Courier New', monospace",
            theme: {
                background: '#1e1e1e',
                foreground: '#f0f0f0',
                cursor: '#f0f0f0',
                selection: 'rgba(255, 255, 255, 0.3)',
            },
            allowProposedApi: true,
        });
 
        // Initialize addons
        const fitAddon = new FitAddon();
        const webLinksAddon = new WebLinksAddon();
        const unicode11Addon = new Unicode11Addon();
 
        term.loadAddon(fitAddon);
        term.loadAddon(webLinksAddon);
        term.loadAddon(unicode11Addon);
        term.unicode.activeVersion = '11';
 
        term.open(terminalRef.current!);
        fitAddon.fit();
        
        // Set initial dimensions
        const { cols, rows } = term;
        setDimensions({ cols, rows });
        terminalInstanceRef.current = term;
 
        // Create WebSocket connection with size parameters
        const socket = new WebSocket(
            `ws://${window.location.hostname}:8000/ws?rows=${rows}&cols=${cols}`
        );
        socketRef.current = socket;
 
        socket.onopen = () => {
            setIsConnected(true);
            // Write initial prompt
            term.write('\x1b[32m$\x1b[0m ');
        };
 
        socket.onmessage = (event) => {
            term.write(event.data);
        };
 
        socket.onclose = () => {
            setIsConnected(false);
            term.write('\r\n\x1b[31mConnection closed. Refresh page to reconnect.\x1b[0m\r\n');
        };
 
        socket.onerror = (error) => {
            console.error('WebSocket Error:', error);
            term.write('\r\n\x1b[31mConnection error. Refresh page.\x1b[0m\r\n');
        };
 
        const handleResize = () => {
            fitAddon.fit();
            const { cols, rows } = term;
            setDimensions({ cols, rows });
            
            // Send new size to server
            if (socket.readyState === WebSocket.OPEN) {
                socket.send(`\x1b[8;${rows};${cols}t`);
            }
        };
 
        const handleData = (data: string) => {
            if (socket.readyState !== WebSocket.OPEN) {
                term.write('\r\n\x1b[31mNot connected. Refresh page.\x1b[0m\r\n');
                return;
            }
 
            const charCode = data.charCodeAt(0);
 
            // Handle Ctrl+C
            if (charCode === 3) {
                socket.send('\x03');
                currentLineBufferRef.current = '';
                term.write('^C\r\n\x1b[32m$\x1b[0m ');
                return;
            }
 
            // Handle Enter
            if (charCode === 13) {
                const currentLine = currentLineBufferRef.current;
                if (currentLine.trim().length > 0) {
                    commandHistoryRef.current.push(currentLine);
                    historyIndexRef.current = commandHistoryRef.current.length;
                    socket.send(currentLine + '\n');
                } else {
                    socket.send('\n');
                }
                currentLineBufferRef.current = '';
                return;
            }
 
            // Handle Backspace
            if (charCode === 127 || charCode === 8) {
                if (currentLineBufferRef.current.length > 0) {
                    term.write('\b \b');
                    currentLineBufferRef.current = currentLineBufferRef.current.slice(0, -1);
                }
                return;
            }
 
            // Handle Arrow Keys
            if (charCode === 27 && data.length >= 2 && data[1] === '[') {
                const arrowKey = data[2];
                if (arrowKey === 'A' || arrowKey === 'B') {
                    if (commandHistoryRef.current.length === 0) return;
                    
                    // Clear current line
                    term.write('\x1b[2K\r\x1b[32m$\x1b[0m ');
                    
                    if (arrowKey === 'A') {
                        if (historyIndexRef.current > 0) {
                            historyIndexRef.current--;
                        }
                    } else {
                        if (historyIndexRef.current < commandHistoryRef.current.length - 1) {
                            historyIndexRef.current++;
                        } else {
                            historyIndexRef.current = commandHistoryRef.current.length;
                        }
                    }
                    
                    if (historyIndexRef.current < commandHistoryRef.current.length) {
                        currentLineBufferRef.current = commandHistoryRef.current[historyIndexRef.current];
                        term.write(currentLineBufferRef.current);
                    } else {
                        currentLineBufferRef.current = '';
                    }
                }
                return;
            }
 
            // Handle printable characters
            if (charCode >= 32) {
                currentLineBufferRef.current += data;
                term.write(data);
            }
        };
 
        term.onData(handleData);
        window.addEventListener('resize', handleResize);
 
        return () => {
            window.removeEventListener('resize', handleResize);
            term.dispose();
            if (socket.readyState === WebSocket.OPEN) {
                socket.close();
            }
        };
    }, []);
 
    return (
        <div style={{ 
            height: '100vh', 
            width: '100vw',
            display: 'flex', 
            flexDirection: 'column',
            backgroundColor: '#1e1e1e'
        }}>
            <div style={{ 
                padding: '4px 8px', 
                backgroundColor: '#1e1e1e', 
                color: isConnected ? '#4CAF50' : '#F44336',
                fontSize: '12px',
                borderBottom: '1px solid #333',
                display: 'flex',
                justifyContent: 'space-between'
            }}>
                <span>Web Terminal ({dimensions.cols}x{dimensions.rows})</span>
                <span>{isConnected ? 'â—Ź Connected' : 'â—‹ Disconnected'}</span>
            </div>
            <div
                ref={terminalRef}
                style={{ 
                    flex: 1,
                    padding: '8px',
                    overflow: 'hidden'
                }}
            />
        </div>
    );
};
 
export default TerminalComponent;

ptyhon

# main.py
import os
import pty
import select
import asyncio
import signal
import fcntl
import termios
import struct
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
 
app = FastAPI()
 
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)
 
class PTYManager:
    def __init__(self):
        self.process_pid = None
        self.fd = None
 
    async def handle_websocket(self, websocket: WebSocket):
        await websocket.accept()
        
        # Get terminal size from WebSocket query params
        query_params = dict(websocket.query_params)
        rows = int(query_params.get('rows', '24'))
        cols = int(query_params.get('cols', '80'))
        
        # Fork PTY with proper size
        pid, fd = pty.fork()
        
        if pid == 0:  # Child process
            # Set terminal environment
            env = os.environ.copy()
            env["TERM"] = "xterm-256color"
            env["COLORTERM"] = "truecolor"
            
            # Set window size
            winsize = struct.pack("HHHH", rows, cols, 0, 0)
            fcntl.ioctl(pty.STDIN_FILENO, termios.TIOCSWINSZ, winsize)
            
            # Disable echo in the child process
            attrs = termios.tcgetattr(pty.STDIN_FILENO)
            attrs[3] = attrs[3] & ~termios.ECHO
            termios.tcsetattr(pty.STDIN_FILENO, termios.TCSANOW, attrs)
            
            os.execvpe("bash", ["bash"], env)
        else:  # Parent process
            self.process_pid = pid
            self.fd = fd
            try:
                while True:
                    r, _, _ = select.select([fd], [], [], 0.1)
                    if fd in r:
                        data = os.read(fd, 1024)
                        if not data:
                            break
                        await websocket.send_text(data.decode('utf-8', errors='replace'))
                    
                    try:
                        data = await asyncio.wait_for(
                            websocket.receive_text(),
                            timeout=0.1
                        )
                        # Handle window resize
                        if data.startswith('\x1b[8;'):
                            try:
                                parts = data[2:-1].split(';')
                                rows = int(parts[1])
                                cols = int(parts[2])
                                winsize = struct.pack("HHHH", rows, cols, 0, 0)
                                fcntl.ioctl(fd, termios.TIOCSWINSZ, winsize)
                            except:
                                pass
                        else:
                            os.write(fd, data.encode())
                    except asyncio.TimeoutError:
                        continue
                        
            except (WebSocketDisconnect, OSError) as e:
                print(f"Connection closed: {e}")
            finally:
                if fd:
                    os.close(fd)
                if pid:
                    try:
                        os.kill(pid, signal.SIGTERM)
                    except ProcessLookupError:
                        pass
 
pty_manager = PTYManager()
 
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await pty_manager.handle_websocket(websocket)
 
@app.get("/")
async def health_check():
    return {"status": "ok"}