- FRE(): remove strpool_gc() call during expression evaluation — temporaries on the C stack would become dangling pointers after compaction - WAIT: reject mask=0 (would infinite-loop since AND 0 is always 0) - ENVIRON: use setenv()+free() instead of putenv() to avoid memory leak and dangling pointer if pool is compacted - portio: add bounds check to com1_inp() (com1_out already had one) - kernel: use CHR$(1) sentinel prefix to avoid collision with user output - kernel: fix INPUT prompt rfind() edge case when no newline in buffer - eval.c: enlarge DATE$ format buffer to silence -Wformat-truncation
576 lines
20 KiB
Python
576 lines
20 KiB
Python
"""Jupyter kernel for GW-BASIC 2026.
|
|
|
|
Persistent process model: a long-running gwbasic subprocess reads BASIC
|
|
commands from stdin and writes output to stdout. The kernel sends each
|
|
notebook cell as one or more lines of BASIC, followed by a sentinel
|
|
PRINT that emits a known delimiter. Output is captured until the
|
|
delimiter appears, then forwarded to the notebook.
|
|
|
|
GW-BASIC's piped mode suppresses the banner and "Ok" prompt automatically,
|
|
and stdout is unbuffered, making it ideal for subprocess communication.
|
|
|
|
Features beyond basic execution:
|
|
- Inline Sixel graphics: ESC P ... ESC \\ sequences are extracted from
|
|
the output stream, decoded into PNG images, and displayed inline.
|
|
- INPUT statement support: when gwbasic prints "? " (the INPUT prompt),
|
|
the kernel requests input from the notebook front-end via the Jupyter
|
|
stdin protocol.
|
|
- Pygments lexer: GW-BASIC syntax highlighting in notebooks.
|
|
- Tab completion for all GW-BASIC keywords.
|
|
"""
|
|
|
|
import base64
|
|
import io
|
|
import os
|
|
import re
|
|
import select
|
|
import shutil
|
|
import signal
|
|
import struct
|
|
import subprocess
|
|
import zlib
|
|
|
|
from ipykernel.kernelbase import Kernel
|
|
|
|
from . import __version__
|
|
|
|
_SENTINEL = '\x01GWDONE\x01'
|
|
_SENTINEL_CMD = 'PRINT CHR$(1)+"GWDONE"+CHR$(1)\n'
|
|
|
|
_ERROR_RE = re.compile(
|
|
r'(?:Syntax error|Illegal function call|Type mismatch|Out of |'
|
|
r'Division by zero|Overflow|RETURN without GOSUB|'
|
|
r'NEXT without FOR|Undefined |Subscript out of range|'
|
|
r'Redo from start|String too long|'
|
|
r'WHILE without WEND|WEND without WHILE|'
|
|
r'File not found|File already open|Bad file |'
|
|
r'Direct statement in file|Missing operand|'
|
|
r'Duplicate Definition|Formula too complex|'
|
|
r'Internal error|Unprintable error)',
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
# DCS (ESC P) starts Sixel, ST (ESC \) ends it — both as bytes
|
|
_DCS = b'\x1bP'
|
|
_ST = b'\x1b\\'
|
|
|
|
GW_KEYWORDS = [
|
|
'AUTO', 'BEEP', 'BLOAD', 'BSAVE', 'CALL', 'CHAIN', 'CHDIR',
|
|
'CIRCLE', 'CLEAR', 'CLOSE', 'CLS', 'COLOR', 'COM', 'COMMON',
|
|
'CONT', 'DATA', 'DATE$', 'DEF', 'DEFDBL', 'DEFINT', 'DEFSNG',
|
|
'DEFSTR', 'DELETE', 'DIM', 'DRAW', 'EDIT', 'ELSE', 'END',
|
|
'ENVIRON', 'ENVIRON$', 'ERASE', 'ERDEV', 'ERDEV$', 'ERR', 'ERL',
|
|
'ERROR', 'FIELD', 'FILES', 'FN', 'FOR', 'GET', 'GOSUB', 'GOTO',
|
|
'IF', 'INPUT', 'IOCTL', 'IOCTL$', 'KEY', 'KILL', 'LCOPY',
|
|
'LET', 'LINE', 'LIST', 'LLIST', 'LOAD', 'LOCATE', 'LPRINT',
|
|
'LSET', 'MERGE', 'MKDIR', 'MOTOR', 'NAME', 'NEW', 'NEXT',
|
|
'NOISE', 'ON', 'OPEN', 'OPTION', 'OUT', 'PAINT', 'PALETTE',
|
|
'PLAY', 'POKE', 'PRESET', 'PRINT', 'PSET', 'PUT', 'RANDOMIZE',
|
|
'READ', 'REM', 'RENUM', 'RESET', 'RESTORE', 'RESUME', 'RETURN',
|
|
'RMDIR', 'RSET', 'RUN', 'SAVE', 'SCREEN', 'SHELL', 'SOUND',
|
|
'STOP', 'SWAP', 'SYSTEM', 'THEN', 'TIME$', 'TIMER', 'TO',
|
|
'TRON', 'TROFF', 'VIEW', 'WAIT', 'WEND', 'WHILE', 'WIDTH',
|
|
'WINDOW', 'WRITE',
|
|
'ABS', 'ASC', 'ATN', 'CDBL', 'CHR$', 'CINT', 'COS', 'CSNG',
|
|
'CSRLIN', 'CVD', 'CVI', 'CVS', 'EOF', 'EXP', 'FIX', 'FRE',
|
|
'HEX$', 'INKEY$', 'INP', 'INPUT$', 'INSTR', 'INT', 'LEFT$',
|
|
'LEN', 'LOC', 'LOF', 'LOG', 'LPOS', 'MID$', 'MKD$', 'MKI$',
|
|
'MKS$', 'OCT$', 'PEEK', 'PEN', 'PMAP', 'POINT', 'POS',
|
|
'RIGHT$', 'RND', 'SGN', 'SIN', 'SPACE$', 'SPC', 'SQR',
|
|
'STICK', 'STR$', 'STRIG', 'STRING$', 'TAB', 'TAN', 'USR',
|
|
'VAL', 'VARPTR', 'VARPTR$',
|
|
'AND', 'EQV', 'IMP', 'MOD', 'NOT', 'OR', 'XOR', 'STEP',
|
|
]
|
|
|
|
|
|
# ---------- Sixel decoder (pure Python, no PIL dependency) ----------
|
|
|
|
def _parse_sixel_palette(data):
|
|
"""Parse #n;2;r;g;b palette definitions from Sixel data."""
|
|
palette = {}
|
|
for m in re.finditer(rb'#(\d+);2;(\d+);(\d+);(\d+)', data):
|
|
idx = int(m.group(1))
|
|
r = int(m.group(2)) * 255 // 100
|
|
g = int(m.group(3)) * 255 // 100
|
|
b = int(m.group(4)) * 255 // 100
|
|
palette[idx] = (r, g, b)
|
|
return palette
|
|
|
|
|
|
def _decode_sixel(data):
|
|
"""Decode Sixel data into an RGBA image. Returns (width, height, rgba_bytes)."""
|
|
# Strip DCS/ST wrappers if present
|
|
if data.startswith(_DCS):
|
|
data = data[len(_DCS):]
|
|
if data.endswith(_ST):
|
|
data = data[:-len(_ST)]
|
|
|
|
# Skip mode char (usually 'q')
|
|
if data and data[0:1] == b'q':
|
|
data = data[1:]
|
|
|
|
palette = _parse_sixel_palette(data)
|
|
if not palette:
|
|
palette = {0: (0, 0, 0)}
|
|
|
|
# First pass: determine image dimensions
|
|
x = 0
|
|
y = 0
|
|
max_x = 0
|
|
max_y = 0
|
|
color = 0
|
|
i = 0
|
|
while i < len(data):
|
|
ch = data[i]
|
|
if ch == ord('#'):
|
|
# Color selector or palette def — skip palette defs
|
|
j = i + 1
|
|
num = b''
|
|
while j < len(data) and (data[j] in b'0123456789'):
|
|
num += bytes([data[j]])
|
|
j += 1
|
|
if j < len(data) and data[j] == ord(';'):
|
|
# Palette definition — skip to end
|
|
while j < len(data) and data[j] not in b'#$-\x1b' and not (63 <= data[j] <= 126):
|
|
j += 1
|
|
i = j
|
|
continue
|
|
color = int(num) if num else 0
|
|
i = j
|
|
elif ch == ord('$'):
|
|
x = 0
|
|
i += 1
|
|
elif ch == ord('-'):
|
|
x = 0
|
|
y += 6
|
|
i += 1
|
|
elif ch == ord('!'):
|
|
# RLE: !count<char>
|
|
j = i + 1
|
|
num = b''
|
|
while j < len(data) and data[j] in b'0123456789':
|
|
num += bytes([data[j]])
|
|
j += 1
|
|
count = int(num) if num else 1
|
|
if j < len(data) and 63 <= data[j] <= 126:
|
|
x += count
|
|
j += 1
|
|
max_x = max(max_x, x)
|
|
max_y = max(max_y, y + 6)
|
|
i = j
|
|
elif 63 <= ch <= 126:
|
|
x += 1
|
|
max_x = max(max_x, x)
|
|
max_y = max(max_y, y + 6)
|
|
i += 1
|
|
else:
|
|
i += 1
|
|
|
|
if max_x == 0 or max_y == 0:
|
|
return 0, 0, b''
|
|
|
|
width, height = max_x, max_y
|
|
# RGBA buffer (transparent background)
|
|
pixels = bytearray(width * height * 4)
|
|
|
|
# Second pass: render pixels
|
|
x = 0
|
|
y = 0
|
|
color = 0
|
|
i = 0
|
|
while i < len(data):
|
|
ch = data[i]
|
|
if ch == ord('#'):
|
|
j = i + 1
|
|
num = b''
|
|
while j < len(data) and data[j] in b'0123456789':
|
|
num += bytes([data[j]])
|
|
j += 1
|
|
if j < len(data) and data[j] == ord(';'):
|
|
while j < len(data) and data[j] not in b'#$-\x1b' and not (63 <= data[j] <= 126):
|
|
j += 1
|
|
i = j
|
|
continue
|
|
color = int(num) if num else 0
|
|
i = j
|
|
elif ch == ord('$'):
|
|
x = 0
|
|
i += 1
|
|
elif ch == ord('-'):
|
|
x = 0
|
|
y += 6
|
|
i += 1
|
|
elif ch == ord('!'):
|
|
j = i + 1
|
|
num = b''
|
|
while j < len(data) and data[j] in b'0123456789':
|
|
num += bytes([data[j]])
|
|
j += 1
|
|
count = int(num) if num else 1
|
|
if j < len(data) and 63 <= data[j] <= 126:
|
|
sixel = data[j] - 63
|
|
j += 1
|
|
r, g, b = palette.get(color, (0, 0, 0))
|
|
for dx in range(count):
|
|
cx = x + dx
|
|
if cx >= width:
|
|
break
|
|
for bit in range(6):
|
|
if sixel & (1 << bit):
|
|
cy = y + bit
|
|
if cy < height:
|
|
off = (cy * width + cx) * 4
|
|
pixels[off] = r
|
|
pixels[off + 1] = g
|
|
pixels[off + 2] = b
|
|
pixels[off + 3] = 255
|
|
x += count
|
|
i = j
|
|
elif 63 <= ch <= 126:
|
|
sixel = ch - 63
|
|
r, g, b = palette.get(color, (0, 0, 0))
|
|
if x < width:
|
|
for bit in range(6):
|
|
if sixel & (1 << bit):
|
|
cy = y + bit
|
|
if cy < height:
|
|
off = (cy * width + x) * 4
|
|
pixels[off] = r
|
|
pixels[off + 1] = g
|
|
pixels[off + 2] = b
|
|
pixels[off + 3] = 255
|
|
x += 1
|
|
i += 1
|
|
else:
|
|
i += 1
|
|
|
|
return width, height, bytes(pixels)
|
|
|
|
|
|
def _rgba_to_png(width, height, rgba):
|
|
"""Encode raw RGBA pixels as PNG (pure Python, no dependencies)."""
|
|
def _chunk(tag, data):
|
|
c = tag + data
|
|
return struct.pack('>I', len(data)) + c + struct.pack('>I', zlib.crc32(c) & 0xFFFFFFFF)
|
|
|
|
ihdr = struct.pack('>IIBBBBB', width, height, 8, 6, 0, 0, 0)
|
|
raw = b''
|
|
stride = width * 4
|
|
for y in range(height):
|
|
raw += b'\x00' + rgba[y * stride:(y + 1) * stride]
|
|
idat = zlib.compress(raw)
|
|
|
|
out = b'\x89PNG\r\n\x1a\n'
|
|
out += _chunk(b'IHDR', ihdr)
|
|
out += _chunk(b'IDAT', idat)
|
|
out += _chunk(b'IEND', b'')
|
|
return out
|
|
|
|
|
|
# ---------- Kernel class ----------
|
|
|
|
class GWBasicKernel(Kernel):
|
|
implementation = 'gwbasickernel'
|
|
implementation_version = __version__
|
|
language = 'basic'
|
|
language_version = '2026'
|
|
language_info = {
|
|
'name': 'basic',
|
|
'mimetype': 'text/x-basic',
|
|
'file_extension': '.bas',
|
|
'codemirror_mode': 'vb',
|
|
'pygments_lexer': 'gwbasickernel.basic_lexer.GWBasicLexer',
|
|
}
|
|
banner = 'GW-BASIC 2026 Jupyter Kernel'
|
|
|
|
def __init__(self, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self._proc = None
|
|
self._timeout = int(os.environ.get('GWBASIC_TIMEOUT', '30'))
|
|
|
|
def _find_binary(self):
|
|
env = os.environ.get('GWBASIC')
|
|
if env and os.path.isfile(env) and os.access(env, os.X_OK):
|
|
return env
|
|
for candidate in ['gwbasic', './build/gwbasic', './gwbasic']:
|
|
found = shutil.which(candidate)
|
|
if found:
|
|
return found
|
|
return None
|
|
|
|
def _start_process(self):
|
|
self._kill_process()
|
|
binary = self._find_binary()
|
|
if not binary:
|
|
return False
|
|
self._proc = subprocess.Popen(
|
|
[binary],
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
bufsize=0,
|
|
)
|
|
return True
|
|
|
|
def _kill_process(self):
|
|
if self._proc is not None:
|
|
try:
|
|
self._proc.kill()
|
|
self._proc.wait(timeout=5)
|
|
except (ProcessLookupError, subprocess.TimeoutExpired):
|
|
pass
|
|
self._proc = None
|
|
|
|
def _is_alive(self):
|
|
return self._proc is not None and self._proc.poll() is None
|
|
|
|
def _send_interrupt_children(self):
|
|
if self._is_alive():
|
|
self._proc.send_signal(signal.SIGINT)
|
|
else:
|
|
super()._send_interrupt_children()
|
|
|
|
def _read_until_sentinel(self, allow_stdin=False):
|
|
"""Read raw bytes from stdout until sentinel or timeout.
|
|
|
|
When allow_stdin is True and the output ends with "? " (INPUT prompt),
|
|
the kernel requests input from the notebook front-end and writes the
|
|
response to the subprocess's stdin.
|
|
|
|
Returns (raw_bytes, status).
|
|
"""
|
|
fd = self._proc.stdout.fileno()
|
|
buf = b''
|
|
sentinel_bytes = _SENTINEL.encode()
|
|
|
|
while True:
|
|
try:
|
|
ready, _, _ = select.select([fd], [], [], self._timeout)
|
|
except (KeyboardInterrupt, InterruptedError):
|
|
if self._is_alive():
|
|
self._proc.send_signal(signal.SIGINT)
|
|
return buf, 'interrupted'
|
|
|
|
if not ready:
|
|
# Check for INPUT prompt before declaring timeout
|
|
if allow_stdin and buf.endswith(b'? '):
|
|
reply = self.raw_input('? ')
|
|
self._proc.stdin.write((reply + '\n').encode())
|
|
self._proc.stdin.flush()
|
|
continue
|
|
return buf, 'timeout'
|
|
|
|
chunk = os.read(fd, 8192)
|
|
if not chunk:
|
|
return buf, 'died'
|
|
|
|
buf += chunk
|
|
|
|
# Check for sentinel in the buffer
|
|
if sentinel_bytes in buf:
|
|
idx = buf.index(sentinel_bytes)
|
|
# Keep everything before the sentinel, stripping the
|
|
# PRINT output that contains it (newline + sentinel + newline)
|
|
before = buf[:idx]
|
|
# Remove trailing newline left by PRINT before sentinel
|
|
if before.endswith(b'\n'):
|
|
before = before[:-1]
|
|
return before, 'done'
|
|
|
|
# Check for INPUT prompt: "? " at end of buffer (no newline after)
|
|
if allow_stdin and buf.endswith(b'? '):
|
|
last_nl = buf.rfind(b'\n')
|
|
tail = buf[last_nl + 1:] if last_nl >= 0 else buf
|
|
if b'\n' not in tail[:-2]: # no embedded newline before "? "
|
|
prompt_line = tail.decode('utf-8', errors='replace')
|
|
reply = self.raw_input(prompt_line)
|
|
self._proc.stdin.write((reply + '\n').encode())
|
|
self._proc.stdin.flush()
|
|
|
|
def _split_sixel(self, raw):
|
|
"""Split raw output into (text_parts, sixel_blobs) interleaved."""
|
|
parts = []
|
|
pos = 0
|
|
while True:
|
|
dcs_idx = raw.find(_DCS, pos)
|
|
if dcs_idx == -1:
|
|
parts.append(('text', raw[pos:]))
|
|
break
|
|
if dcs_idx > pos:
|
|
parts.append(('text', raw[pos:dcs_idx]))
|
|
st_idx = raw.find(_ST, dcs_idx)
|
|
if st_idx == -1:
|
|
parts.append(('text', raw[pos:]))
|
|
break
|
|
end = st_idx + len(_ST)
|
|
parts.append(('sixel', raw[dcs_idx:end]))
|
|
pos = end
|
|
return parts
|
|
|
|
def _has_error(self, text):
|
|
return bool(_ERROR_RE.search(text))
|
|
|
|
def _handle_magic(self, code):
|
|
stripped = code.strip()
|
|
if stripped == '%reset':
|
|
self._kill_process()
|
|
return True, 'Session reset.'
|
|
if stripped.startswith('%timeout'):
|
|
parts = stripped.split()
|
|
if len(parts) == 2:
|
|
try:
|
|
self._timeout = int(parts[1])
|
|
return True, f'Timeout set to {self._timeout}s.'
|
|
except ValueError:
|
|
return True, 'Usage: %timeout <seconds>'
|
|
return True, f'Current timeout: {self._timeout}s. Usage: %timeout <seconds>'
|
|
if stripped == '%new':
|
|
if self._is_alive():
|
|
self._proc.stdin.write(b'NEW\n')
|
|
self._proc.stdin.write(_SENTINEL_CMD.encode())
|
|
self._proc.stdin.flush()
|
|
self._read_until_sentinel()
|
|
return True, 'Program cleared.'
|
|
return False, None
|
|
|
|
def do_execute(self, code, silent, store_history=True,
|
|
user_expressions=None, allow_stdin=False):
|
|
handled, response = self._handle_magic(code)
|
|
if handled:
|
|
if not silent and response:
|
|
self.send_response(self.iopub_socket, 'stream',
|
|
{'name': 'stdout', 'text': response + '\n'})
|
|
return {
|
|
'status': 'ok', 'execution_count': self.execution_count,
|
|
'payload': [], 'user_expressions': {},
|
|
}
|
|
|
|
if not code.strip():
|
|
return {
|
|
'status': 'ok', 'execution_count': self.execution_count,
|
|
'payload': [], 'user_expressions': {},
|
|
}
|
|
|
|
if not self._is_alive():
|
|
if not self._start_process():
|
|
if not silent:
|
|
self.send_response(self.iopub_socket, 'stream', {
|
|
'name': 'stderr',
|
|
'text': 'gwbasic binary not found.\n'
|
|
'Set GWBASIC env var or install gwbasic on PATH.\n',
|
|
})
|
|
return {
|
|
'status': 'error', 'execution_count': self.execution_count,
|
|
'ename': 'FileNotFoundError', 'evalue': 'gwbasic not found',
|
|
'traceback': [],
|
|
}
|
|
|
|
for line in code.split('\n'):
|
|
line = line.rstrip()
|
|
if line:
|
|
self._proc.stdin.write((line + '\n').encode())
|
|
|
|
self._proc.stdin.write(_SENTINEL_CMD.encode())
|
|
self._proc.stdin.flush()
|
|
|
|
raw, status = self._read_until_sentinel(allow_stdin=allow_stdin)
|
|
|
|
if status == 'interrupted':
|
|
if not silent:
|
|
self.send_response(self.iopub_socket, 'stream', {
|
|
'name': 'stderr', 'text': 'Execution interrupted.\n',
|
|
})
|
|
return {'status': 'abort', 'execution_count': self.execution_count}
|
|
|
|
if status == 'timeout':
|
|
self._kill_process()
|
|
if not silent:
|
|
self.send_response(self.iopub_socket, 'stream', {
|
|
'name': 'stderr',
|
|
'text': f'Execution timed out after {self._timeout}s. '
|
|
f'Use %timeout to increase.\n',
|
|
})
|
|
return {
|
|
'status': 'error', 'execution_count': self.execution_count,
|
|
'ename': 'TimeoutError',
|
|
'evalue': f'Timed out after {self._timeout}s',
|
|
'traceback': [],
|
|
}
|
|
|
|
if status == 'died':
|
|
self._proc = None
|
|
text = raw.decode('utf-8', errors='replace').strip()
|
|
if not silent and text:
|
|
self.send_response(self.iopub_socket, 'stream', {
|
|
'name': 'stderr',
|
|
'text': f'GW-BASIC process exited.\n{text}\n',
|
|
})
|
|
return {
|
|
'status': 'error', 'execution_count': self.execution_count,
|
|
'ename': 'RuntimeError', 'evalue': 'Process exited',
|
|
'traceback': [],
|
|
}
|
|
|
|
# Split output into text and Sixel graphics
|
|
parts = self._split_sixel(raw)
|
|
|
|
is_error = False
|
|
for kind, data in parts:
|
|
if kind == 'text':
|
|
text = data.decode('utf-8', errors='replace').strip()
|
|
if not text:
|
|
continue
|
|
if self._has_error(text):
|
|
is_error = True
|
|
if not silent:
|
|
self.send_response(self.iopub_socket, 'stream', {
|
|
'name': 'stderr' if is_error else 'stdout',
|
|
'text': text + '\n',
|
|
})
|
|
elif kind == 'sixel' and not silent:
|
|
w, h, rgba = _decode_sixel(data)
|
|
if w > 0 and h > 0:
|
|
png = _rgba_to_png(w, h, rgba)
|
|
b64 = base64.b64encode(png).decode('ascii')
|
|
self.send_response(self.iopub_socket, 'display_data', {
|
|
'data': {'image/png': b64},
|
|
'metadata': {'image/png': {'width': w, 'height': h}},
|
|
})
|
|
|
|
if is_error:
|
|
err_text = raw.decode('utf-8', errors='replace').strip()
|
|
return {
|
|
'status': 'error', 'execution_count': self.execution_count,
|
|
'ename': 'BASICError', 'evalue': err_text.split('\n')[0],
|
|
'traceback': err_text.split('\n'),
|
|
}
|
|
|
|
return {
|
|
'status': 'ok', 'execution_count': self.execution_count,
|
|
'payload': [], 'user_expressions': {},
|
|
}
|
|
|
|
def do_complete(self, code, cursor_pos):
|
|
text = code[:cursor_pos]
|
|
match = re.search(r'([A-Za-z_]\w*\$?)$', text)
|
|
if not match:
|
|
return {'matches': [], 'cursor_start': cursor_pos,
|
|
'cursor_end': cursor_pos, 'status': 'ok'}
|
|
|
|
token = match.group(1).upper()
|
|
start = cursor_pos - len(token)
|
|
matches = [kw for kw in GW_KEYWORDS if kw.startswith(token)]
|
|
return {
|
|
'matches': matches, 'cursor_start': start,
|
|
'cursor_end': cursor_pos, 'status': 'ok',
|
|
}
|
|
|
|
def do_is_complete(self, code):
|
|
if not code.strip():
|
|
return {'status': 'incomplete', 'indent': ''}
|
|
return {'status': 'complete'}
|
|
|
|
def do_shutdown(self, restart):
|
|
self._kill_process()
|
|
return {'status': 'ok', 'restart': restart}
|