mirror of
https://github.com/ihabunek/toot.git
synced 2024-06-16 06:15:25 +00:00
Compare commits
25 Commits
6267f20f2a
...
c639a2609c
Author | SHA1 | Date | |
---|---|---|---|
|
c639a2609c | ||
|
0fc2ec12f5 | ||
|
07ad41960f | ||
|
07beba8c68 | ||
|
7244b2718f | ||
|
968a516f76 | ||
|
38eca67905 | ||
|
1d48e64853 | ||
|
bf12dbff70 | ||
|
4b17e2e586 | ||
|
20968fe87f | ||
|
3bac9b2fb6 | ||
|
3420f1466a | ||
|
3eebbe35c9 | ||
|
4d5ac3cc4e | ||
|
ee98ce3746 | ||
|
0cbb8863b3 | ||
|
1709a416b3 | ||
|
f324aa119d | ||
|
43f51cbbb9 | ||
|
225dfbfb2e | ||
|
9ae205c548 | ||
|
9875209b30 | ||
|
965ffa1312 | ||
|
f5a465ff25 |
|
@ -3,6 +3,13 @@ Changelog
|
|||
|
||||
<!-- Do not edit. This file is automatically generated from changelog.yaml.-->
|
||||
|
||||
**0.42.0 (2024-03-09)**
|
||||
|
||||
* TUI: Add `toot tui --always-show-sensitive` option (thanks Lexi Winter)
|
||||
* TUI: Document missing shortcuts (thanks Denis Laxalde)
|
||||
* TUI: Use rounded boxes for nicer visuals (thanks Dan Schwarz)
|
||||
* TUI: Don't break if edited_at status field does not exist
|
||||
|
||||
**0.41.1 (2024-01-02)**
|
||||
|
||||
* Fix a crash in settings parsing code
|
||||
|
|
|
@ -1,3 +1,11 @@
|
|||
0.42.0:
|
||||
date: 2024-03-09
|
||||
changes:
|
||||
- "TUI: Add `toot tui --always-show-sensitive` option (thanks Lexi Winter)"
|
||||
- "TUI: Document missing shortcuts (thanks Denis Laxalde)"
|
||||
- "TUI: Use rounded boxes for nicer visuals (thanks Dan Schwarz)"
|
||||
- "TUI: Don't break if edited_at status field does not exist"
|
||||
|
||||
0.41.1:
|
||||
date: 2024-01-02
|
||||
changes:
|
||||
|
|
|
@ -3,6 +3,13 @@ Changelog
|
|||
|
||||
<!-- Do not edit. This file is automatically generated from changelog.yaml.-->
|
||||
|
||||
**0.42.0 (2024-03-09)**
|
||||
|
||||
* TUI: Add `toot tui --always-show-sensitive` option (thanks Lexi Winter)
|
||||
* TUI: Document missing shortcuts (thanks Denis Laxalde)
|
||||
* TUI: Use rounded boxes for nicer visuals (thanks Dan Schwarz)
|
||||
* TUI: Don't break if edited_at status field does not exist
|
||||
|
||||
**0.41.1 (2024-01-02)**
|
||||
|
||||
* Fix a crash in settings parsing code
|
||||
|
|
10
setup.py
10
setup.py
|
@ -12,7 +12,7 @@ and blocking accounts and other actions.
|
|||
|
||||
setup(
|
||||
name='toot',
|
||||
version='0.41.1',
|
||||
version='0.42.0',
|
||||
description='Mastodon CLI client',
|
||||
long_description=long_description.strip(),
|
||||
author='Ivan Habunek',
|
||||
|
@ -39,9 +39,14 @@ setup(
|
|||
"beautifulsoup4>=4.5.0,<5.0",
|
||||
"wcwidth>=0.1.7",
|
||||
"urwid>=2.0.0,<3.0",
|
||||
"tomlkit>=0.10.0,<1.0"
|
||||
"tomlkit>=0.10.0,<1.0",
|
||||
],
|
||||
extras_require={
|
||||
# Required to display images in the TUI
|
||||
"images": [
|
||||
"pillow>=9.5.0",
|
||||
"term-image==0.7.0",
|
||||
],
|
||||
# Required to display rich text in the TUI
|
||||
"richtext": [
|
||||
"urwidgets>=0.1,<0.2"
|
||||
|
@ -60,6 +65,7 @@ setup(
|
|||
"setuptools",
|
||||
"vermin",
|
||||
"typing-extensions",
|
||||
"pillow>=9.5.0",
|
||||
],
|
||||
},
|
||||
entry_points={
|
||||
|
|
42
tests/README.md
Normal file
42
tests/README.md
Normal file
|
@ -0,0 +1,42 @@
|
|||
Testing toot
|
||||
============
|
||||
|
||||
This document is WIP.
|
||||
|
||||
Mastodon
|
||||
--------
|
||||
|
||||
TODO
|
||||
|
||||
Pleroma
|
||||
-------
|
||||
|
||||
TODO
|
||||
|
||||
Akkoma
|
||||
------
|
||||
|
||||
Install using the guide here:
|
||||
https://docs.akkoma.dev/stable/installation/docker_en/
|
||||
|
||||
Disable captcha and throttling by adding this to `config/prod.exs`:
|
||||
|
||||
```ex
|
||||
# Disable captcha for testing
|
||||
config :pleroma, Pleroma.Captcha,
|
||||
enabled: false
|
||||
|
||||
# Disable rate limiting for testing
|
||||
config :pleroma, :rate_limit,
|
||||
authentication: nil,
|
||||
timeline: nil,
|
||||
search: nil,
|
||||
app_account_creation: nil,
|
||||
relations_actions: nil,
|
||||
relation_id_action: nil,
|
||||
statuses_actions: nil,
|
||||
status_id_action: nil,
|
||||
password_reset: nil,
|
||||
account_confirmation_resend: nil,
|
||||
ap_routes: nil
|
||||
```
|
|
@ -41,6 +41,8 @@ TRUMPET = str(Path(__file__).parent.parent.parent / "trumpet.png")
|
|||
|
||||
ASSETS_DIR = str(Path(__file__).parent.parent / "assets")
|
||||
|
||||
PASSWORD = "83dU29170rjKilKQQwuWhJv3PKnSW59bWx0perjP6i7Nu4rkeh4mRfYuvVLYM3fM"
|
||||
|
||||
|
||||
def create_app(base_url):
|
||||
instance = api.get_instance(base_url).json()
|
||||
|
@ -52,7 +54,7 @@ def register_account(app: App):
|
|||
username = str(uuid.uuid4())[-10:]
|
||||
email = f"{username}@example.com"
|
||||
|
||||
response = api.register_account(app, username, email, "password", "en")
|
||||
response = api.register_account(app, username, email, PASSWORD, "en")
|
||||
return User(app.instance, username, response["access_token"])
|
||||
|
||||
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import json
|
||||
from tests.integration.conftest import register_account
|
||||
|
||||
from toot import App, User, api, cli
|
||||
from toot.entities import Account, Relationship, from_dict
|
||||
|
@ -35,9 +36,8 @@ def test_whois(app: App, friend: User, run):
|
|||
assert f"@{friend.username}" in result.stdout
|
||||
|
||||
|
||||
def test_following(app: App, user: User, friend: User, friend_id, run):
|
||||
# Make sure we're not initially following friend
|
||||
api.unfollow(app, user, friend_id)
|
||||
def test_following(app: App, user: User, run):
|
||||
friend = register_account(app)
|
||||
|
||||
result = run(cli.accounts.following, user.username)
|
||||
assert result.exit_code == 0
|
||||
|
@ -84,9 +84,8 @@ def test_following_not_found(run):
|
|||
assert result.stderr.strip() == "Error: Account not found"
|
||||
|
||||
|
||||
def test_following_json(app: App, user: User, friend: User, user_id, friend_id, run_json):
|
||||
# Make sure we're not initially following friend
|
||||
api.unfollow(app, user, friend_id)
|
||||
def test_following_json(app: App, user: User, user_id, run_json):
|
||||
friend = register_account(app)
|
||||
|
||||
result = run_json(cli.accounts.following, user.username, "--json")
|
||||
assert result == []
|
||||
|
@ -96,24 +95,26 @@ def test_following_json(app: App, user: User, friend: User, user_id, friend_id,
|
|||
|
||||
result = run_json(cli.accounts.follow, friend.username, "--json")
|
||||
relationship = from_dict(Relationship, result)
|
||||
assert relationship.id == friend_id
|
||||
assert relationship.following is True
|
||||
|
||||
[result] = run_json(cli.accounts.following, user.username, "--json")
|
||||
relationship = from_dict(Relationship, result)
|
||||
assert relationship.id == friend_id
|
||||
account = from_dict(Account, result)
|
||||
assert account.acct == friend.username
|
||||
|
||||
# If no account is given defaults to logged in user
|
||||
[result] = run_json(cli.accounts.following, user.username, "--json")
|
||||
relationship = from_dict(Relationship, result)
|
||||
assert relationship.id == friend_id
|
||||
[result] = run_json(cli.accounts.following, "--json")
|
||||
account = from_dict(Account, result)
|
||||
assert account.acct == friend.username
|
||||
|
||||
assert relationship.following is True
|
||||
|
||||
[result] = run_json(cli.accounts.followers, friend.username, "--json")
|
||||
assert result["id"] == user_id
|
||||
account = from_dict(Account, result)
|
||||
assert account.acct == user.username
|
||||
|
||||
result = run_json(cli.accounts.unfollow, friend.username, "--json")
|
||||
assert result["id"] == friend_id
|
||||
assert result["following"] is False
|
||||
relationship = from_dict(Relationship, result)
|
||||
assert relationship.following is False
|
||||
|
||||
result = run_json(cli.accounts.following, user.username, "--json")
|
||||
assert result == []
|
||||
|
@ -200,9 +201,8 @@ def test_mute_json(app: App, user: User, friend: User, run_json, friend_id):
|
|||
assert result == []
|
||||
|
||||
|
||||
def test_block(app, user, friend, friend_id, run):
|
||||
# Make sure we're not initially blocking friend
|
||||
api.unblock(app, user, friend_id)
|
||||
def test_block(app, user, run):
|
||||
friend = register_account(app)
|
||||
|
||||
result = run(cli.accounts.blocked)
|
||||
assert result.exit_code == 0
|
||||
|
|
|
@ -3,7 +3,7 @@ from unittest import mock
|
|||
from unittest.mock import MagicMock
|
||||
|
||||
from toot import User, cli
|
||||
from tests.integration.conftest import Run
|
||||
from tests.integration.conftest import PASSWORD, Run
|
||||
|
||||
# TODO: figure out how to test login
|
||||
|
||||
|
@ -89,7 +89,7 @@ def test_login_cli(
|
|||
cli.auth.login_cli,
|
||||
"--instance", "http://localhost:3000",
|
||||
"--email", f"{user.username}@example.com",
|
||||
"--password", "password",
|
||||
"--password", PASSWORD,
|
||||
)
|
||||
assert result.exit_code == 0
|
||||
assert "✓ Successfully logged in." in result.stdout
|
||||
|
|
|
@ -1,8 +1,12 @@
|
|||
import click
|
||||
import pytest
|
||||
import sys
|
||||
|
||||
from toot.cli.validators import validate_duration
|
||||
from toot.wcstring import wc_wrap, trunc, pad, fit_text
|
||||
from toot.tui.utils import LRUCache
|
||||
from PIL import Image
|
||||
from collections import namedtuple
|
||||
from toot.utils import urlencode_url
|
||||
|
||||
|
||||
|
@ -207,6 +211,111 @@ def test_duration():
|
|||
duration("banana")
|
||||
|
||||
|
||||
def test_cache_null():
|
||||
"""Null dict is null."""
|
||||
cache = LRUCache(cache_max_bytes=1024)
|
||||
assert cache.__len__() == 0
|
||||
|
||||
|
||||
Case = namedtuple("Case", ["cache_len", "len", "init"])
|
||||
|
||||
img = Image.new('RGB', (100, 100))
|
||||
img_size = sys.getsizeof(img.tobytes())
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"case",
|
||||
[
|
||||
Case(9, 0, []),
|
||||
Case(9, 1, [("one", img)]),
|
||||
Case(9, 2, [("one", img), ("two", img)]),
|
||||
Case(2, 2, [("one", img), ("two", img)]),
|
||||
Case(1, 1, [("one", img), ("two", img)]),
|
||||
],
|
||||
)
|
||||
@pytest.mark.parametrize("method", ["assign", "init"])
|
||||
def test_cache_init(case, method):
|
||||
"""Check that the # of elements is right, given # given and cache_len."""
|
||||
if method == "init":
|
||||
cache = LRUCache(case.init, cache_max_bytes=img_size * case.cache_len)
|
||||
elif method == "assign":
|
||||
cache = LRUCache(cache_max_bytes=img_size * case.cache_len)
|
||||
for (key, val) in case.init:
|
||||
cache[key] = val
|
||||
else:
|
||||
assert False
|
||||
|
||||
# length is max(#entries, cache_len)
|
||||
assert cache.__len__() == case.len
|
||||
|
||||
# make sure the first entry is the one ejected
|
||||
if case.cache_len > 1 and case.init:
|
||||
assert "one" in cache.keys()
|
||||
else:
|
||||
assert "one" not in cache.keys()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("method", ["init", "assign"])
|
||||
def test_cache_overflow_default(method):
|
||||
"""Test default overflow logic."""
|
||||
if method == "init":
|
||||
cache = LRUCache([("one", img), ("two", img), ("three", img)], cache_max_bytes=img_size * 2)
|
||||
elif method == "assign":
|
||||
cache = LRUCache(cache_max_bytes=img_size * 2)
|
||||
cache["one"] = img
|
||||
cache["two"] = img
|
||||
cache["three"] = img
|
||||
else:
|
||||
assert False
|
||||
|
||||
assert "one" not in cache.keys()
|
||||
assert "two" in cache.keys()
|
||||
assert "three" in cache.keys()
|
||||
|
||||
@pytest.mark.parametrize("mode", ["get", "set"])
|
||||
@pytest.mark.parametrize("add_third", [False, True])
|
||||
def test_cache_lru_overflow(mode, add_third):
|
||||
img = Image.new('RGB', (100, 100))
|
||||
img_size = sys.getsizeof(img.tobytes())
|
||||
|
||||
"""Test that key access resets LRU logic."""
|
||||
|
||||
cache = LRUCache([("one", img), ("two", img)], cache_max_bytes=img_size * 2)
|
||||
|
||||
if mode == "get":
|
||||
dummy = cache["one"]
|
||||
elif mode == "set":
|
||||
cache["one"] = img
|
||||
else:
|
||||
assert False
|
||||
|
||||
if add_third:
|
||||
cache["three"] = img
|
||||
|
||||
assert "one" in cache.keys()
|
||||
assert "two" not in cache.keys()
|
||||
assert "three" in cache.keys()
|
||||
else:
|
||||
assert "one" in cache.keys()
|
||||
assert "two" in cache.keys()
|
||||
assert "three" not in cache.keys()
|
||||
|
||||
|
||||
def test_cache_keyerror():
|
||||
cache = LRUCache()
|
||||
with pytest.raises(KeyError):
|
||||
cache["foo"]
|
||||
|
||||
|
||||
def test_cache_miss_doesnt_eject():
|
||||
cache = LRUCache([("one", img), ("two", img)], cache_max_bytes=img_size * 3)
|
||||
with pytest.raises(KeyError):
|
||||
cache["foo"]
|
||||
|
||||
assert len(cache) == 2
|
||||
assert "one" in cache.keys()
|
||||
assert "two" in cache.keys()
|
||||
|
||||
def test_urlencode_url():
|
||||
assert urlencode_url("https://www.example.com") == "https://www.example.com"
|
||||
assert urlencode_url("https://www.example.com/url%20with%20spaces") == "https://www.example.com/url%20with%20spaces"
|
||||
|
||||
|
|
|
@ -3,28 +3,13 @@ Helpers for testing.
|
|||
"""
|
||||
|
||||
import time
|
||||
from typing import Any, Callable
|
||||
from typing import Callable, TypeVar
|
||||
|
||||
|
||||
class MockResponse:
|
||||
def __init__(self, response_data={}, ok=True, is_redirect=False):
|
||||
self.response_data = response_data
|
||||
self.content = response_data
|
||||
self.ok = ok
|
||||
self.is_redirect = is_redirect
|
||||
|
||||
def raise_for_status(self):
|
||||
pass
|
||||
|
||||
def json(self):
|
||||
return self.response_data
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
def retval(val):
|
||||
return lambda *args, **kwargs: val
|
||||
|
||||
|
||||
def run_with_retries(fn: Callable[..., Any]):
|
||||
def run_with_retries(fn: Callable[..., T]) -> T:
|
||||
"""
|
||||
Run the the given function repeatedly until it finishes without raising an
|
||||
AssertionError. Sleep a bit between attempts. If the function doesn't
|
||||
|
@ -41,4 +26,4 @@ def run_with_retries(fn: Callable[..., Any]):
|
|||
except AssertionError:
|
||||
time.sleep(delay)
|
||||
|
||||
fn()
|
||||
return fn()
|
||||
|
|
|
@ -4,7 +4,7 @@ import sys
|
|||
from os.path import join, expanduser
|
||||
from typing import NamedTuple
|
||||
|
||||
__version__ = '0.41.1'
|
||||
__version__ = '0.42.0'
|
||||
|
||||
|
||||
class App(NamedTuple):
|
||||
|
|
|
@ -22,7 +22,7 @@ T = t.TypeVar("T")
|
|||
|
||||
PRIVACY_CHOICES = ["public", "unlisted", "private"]
|
||||
VISIBILITY_CHOICES = ["public", "unlisted", "private", "direct"]
|
||||
|
||||
IMAGE_FORMAT_CHOICES = ["block", "iterm", "kitty"]
|
||||
TUI_COLORS = {
|
||||
"1": 1,
|
||||
"16": 16,
|
||||
|
|
|
@ -3,6 +3,7 @@ import json as pyjson
|
|||
|
||||
from toot import api, config
|
||||
from toot.cli import Context, cli, pass_context, json_option
|
||||
from toot.entities import from_dict_list, List
|
||||
from toot.output import print_list_accounts, print_lists, print_warning
|
||||
|
||||
|
||||
|
@ -18,7 +19,8 @@ def lists(ctx: click.Context):
|
|||
if not user or not app:
|
||||
raise click.ClickException("This command requires you to be logged in.")
|
||||
|
||||
lists = api.get_lists(app, user)
|
||||
data = api.get_lists(app, user)
|
||||
lists = from_dict_list(List, data)
|
||||
if lists:
|
||||
print_lists(lists)
|
||||
else:
|
||||
|
@ -30,12 +32,13 @@ def lists(ctx: click.Context):
|
|||
@pass_context
|
||||
def list(ctx: Context, json: bool):
|
||||
"""List all your lists"""
|
||||
lists = api.get_lists(ctx.app, ctx.user)
|
||||
data = api.get_lists(ctx.app, ctx.user)
|
||||
|
||||
if json:
|
||||
click.echo(pyjson.dumps(lists))
|
||||
click.echo(pyjson.dumps(data))
|
||||
else:
|
||||
if lists:
|
||||
if data:
|
||||
lists = from_dict_list(List, data)
|
||||
print_lists(lists)
|
||||
else:
|
||||
click.echo("You have no lists defined.")
|
||||
|
|
|
@ -145,7 +145,7 @@ def post(
|
|||
else:
|
||||
user, app = ctx.user, ctx.app
|
||||
|
||||
media_ids = _upload_media(ctx.app, ctx.user, media, descriptions, thumbnails)
|
||||
media_ids = _upload_media(app, user, media, descriptions, thumbnails)
|
||||
status_text = _get_status_text(text, editor, media)
|
||||
scheduled_at = _get_scheduled_at(scheduled_at, scheduled_in)
|
||||
|
||||
|
|
|
@ -111,7 +111,10 @@ def bookmarks(
|
|||
|
||||
|
||||
@cli.command()
|
||||
@click.option("--clear", help="Dismiss all notifications and exit")
|
||||
@click.option(
|
||||
"--clear", is_flag=True,
|
||||
help="Dismiss all notifications and exit"
|
||||
)
|
||||
@click.option(
|
||||
"--reverse", "-r", is_flag=True,
|
||||
help="Reverse the order of the shown notifications (newest on top)"
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
import click
|
||||
|
||||
from typing import Optional
|
||||
from toot.cli import TUI_COLORS, VISIBILITY_CHOICES, Context, cli, pass_context
|
||||
from toot.cli.validators import validate_tui_colors
|
||||
from toot.cli import TUI_COLORS, VISIBILITY_CHOICES, IMAGE_FORMAT_CHOICES, Context, cli, pass_context
|
||||
from toot.cli.validators import validate_tui_colors, validate_cache_size
|
||||
from toot.tui.app import TUI, TuiOptions
|
||||
|
||||
COLOR_OPTIONS = ", ".join(TUI_COLORS.keys())
|
||||
|
@ -24,16 +24,27 @@ COLOR_OPTIONS = ", ".join(TUI_COLORS.keys())
|
|||
help=f"""Number of colors to use, one of {COLOR_OPTIONS}, defaults to 16 if
|
||||
using --color, and 1 if using --no-color."""
|
||||
)
|
||||
@click.option(
|
||||
"-s", "--cache-size",
|
||||
callback=validate_cache_size,
|
||||
help="""Specify the image cache maximum size in megabytes. Default: 10MB.
|
||||
Minimum: 1MB."""
|
||||
)
|
||||
@click.option(
|
||||
"-v", "--default-visibility",
|
||||
type=click.Choice(VISIBILITY_CHOICES),
|
||||
help="Default visibility when posting new toots; overrides the server-side preference"
|
||||
)
|
||||
@click.option(
|
||||
"-S", "--always-show-sensitive",
|
||||
"-s", "--always-show-sensitive",
|
||||
is_flag=True,
|
||||
help="Expand toots with content warnings automatically"
|
||||
)
|
||||
@click.option(
|
||||
"-f", "--image-format",
|
||||
type=click.Choice(IMAGE_FORMAT_CHOICES),
|
||||
help="Image output format; support varies across terminals. Default: block"
|
||||
)
|
||||
@pass_context
|
||||
def tui(
|
||||
ctx: Context,
|
||||
|
@ -41,7 +52,9 @@ def tui(
|
|||
media_viewer: Optional[str],
|
||||
always_show_sensitive: bool,
|
||||
relative_datetimes: bool,
|
||||
default_visibility: Optional[str]
|
||||
cache_size: Optional[int],
|
||||
default_visibility: Optional[str],
|
||||
image_format: Optional[str]
|
||||
):
|
||||
"""Launches the toot terminal user interface"""
|
||||
if colors is None:
|
||||
|
@ -51,8 +64,10 @@ def tui(
|
|||
colors=colors,
|
||||
media_viewer=media_viewer,
|
||||
relative_datetimes=relative_datetimes,
|
||||
cache_size=cache_size,
|
||||
default_visibility=default_visibility,
|
||||
always_show_sensitive=always_show_sensitive,
|
||||
image_format=image_format,
|
||||
)
|
||||
tui = TUI.create(ctx.app, ctx.user, options)
|
||||
tui.run()
|
||||
|
|
|
@ -73,3 +73,21 @@ def validate_tui_colors(ctx, param, value) -> Optional[int]:
|
|||
return TUI_COLORS[value]
|
||||
|
||||
raise click.BadParameter(f"Invalid value: {value}. Expected one of: {', '.join(TUI_COLORS)}")
|
||||
|
||||
|
||||
def validate_cache_size(ctx: click.Context, param: str, value: Optional[str]) -> Optional[int]:
|
||||
"""validates the cache size parameter"""
|
||||
|
||||
if value is None:
|
||||
return 1024 * 1024 * 10 # default 10MB
|
||||
else:
|
||||
if value.isdigit():
|
||||
size = int(value)
|
||||
else:
|
||||
raise click.BadParameter("Cache size must be numeric.")
|
||||
|
||||
if size > 1024:
|
||||
raise click.BadParameter("Cache size too large: 1024MB maximum.")
|
||||
elif size < 1:
|
||||
raise click.BadParameter("Cache size too small: 1MB minimum.")
|
||||
return size
|
||||
|
|
|
@ -17,11 +17,11 @@ def get_config_file_path():
|
|||
return join(get_config_dir(), TOOT_CONFIG_FILE_NAME)
|
||||
|
||||
|
||||
def user_id(user):
|
||||
def user_id(user: User):
|
||||
return "{}@{}".format(user.username, user.instance)
|
||||
|
||||
|
||||
def make_config(path):
|
||||
def make_config(path: str):
|
||||
"""Creates an empty toot configuration file."""
|
||||
config = {
|
||||
"apps": {},
|
||||
|
@ -58,7 +58,7 @@ def save_config(config):
|
|||
return json.dump(config, f, indent=True, sort_keys=True)
|
||||
|
||||
|
||||
def extract_user_app(config, user_id):
|
||||
def extract_user_app(config, user_id: str):
|
||||
if user_id not in config['users']:
|
||||
return None, None
|
||||
|
||||
|
@ -82,7 +82,7 @@ def get_active_user_app():
|
|||
return None, None
|
||||
|
||||
|
||||
def get_user_app(user_id):
|
||||
def get_user_app(user_id: str):
|
||||
"""Returns (User, App) for given user ID or (None, None) if user is not logged in."""
|
||||
return extract_user_app(load_config(), user_id)
|
||||
|
||||
|
@ -93,7 +93,7 @@ def load_app(instance: str) -> Optional[App]:
|
|||
return App(**config['apps'][instance])
|
||||
|
||||
|
||||
def load_user(user_id, throw=False):
|
||||
def load_user(user_id: str, throw=False):
|
||||
config = load_config()
|
||||
|
||||
if user_id in config['users']:
|
||||
|
@ -120,7 +120,7 @@ def save_app(app: App):
|
|||
config['apps'][app.instance] = app._asdict()
|
||||
|
||||
|
||||
def delete_app(config, app):
|
||||
def delete_app(config, app: App):
|
||||
with edit_config() as config:
|
||||
config['apps'].pop(app.instance, None)
|
||||
|
||||
|
|
|
@ -9,11 +9,12 @@ different versions of the Mastodon API.
|
|||
"""
|
||||
|
||||
import dataclasses
|
||||
import typing as t
|
||||
|
||||
from dataclasses import dataclass, is_dataclass
|
||||
from datetime import date, datetime
|
||||
from functools import lru_cache
|
||||
from typing import Any, Dict, List, Optional, Tuple, Type, TypeVar, Union
|
||||
from typing import Any, Dict, Optional, Tuple, Type, TypeVar, Union
|
||||
from typing import get_type_hints
|
||||
|
||||
from toot.typing_compat import get_args, get_origin
|
||||
|
@ -59,8 +60,8 @@ class Account:
|
|||
header: str
|
||||
header_static: str
|
||||
locked: bool
|
||||
fields: List[AccountField]
|
||||
emojis: List[CustomEmoji]
|
||||
fields: t.List[AccountField]
|
||||
emojis: t.List[CustomEmoji]
|
||||
bot: bool
|
||||
group: bool
|
||||
discoverable: Optional[bool]
|
||||
|
@ -154,10 +155,10 @@ class Poll:
|
|||
multiple: bool
|
||||
votes_count: int
|
||||
voters_count: Optional[int]
|
||||
options: List[PollOption]
|
||||
emojis: List[CustomEmoji]
|
||||
options: t.List[PollOption]
|
||||
emojis: t.List[CustomEmoji]
|
||||
voted: Optional[bool]
|
||||
own_votes: Optional[List[int]]
|
||||
own_votes: Optional[t.List[int]]
|
||||
|
||||
|
||||
@dataclass
|
||||
|
@ -207,11 +208,11 @@ class Filter:
|
|||
"""
|
||||
id: str
|
||||
title: str
|
||||
context: List[str]
|
||||
context: t.List[str]
|
||||
expires_at: Optional[datetime]
|
||||
filter_action: str
|
||||
keywords: List[FilterKeyword]
|
||||
statuses: List[FilterStatus]
|
||||
keywords: t.List[FilterKeyword]
|
||||
statuses: t.List[FilterStatus]
|
||||
|
||||
|
||||
@dataclass
|
||||
|
@ -220,7 +221,7 @@ class FilterResult:
|
|||
https://docs.joinmastodon.org/entities/FilterResult/
|
||||
"""
|
||||
filter: Filter
|
||||
keyword_matches: Optional[List[str]]
|
||||
keyword_matches: Optional[t.List[str]]
|
||||
status_matches: Optional[str]
|
||||
|
||||
|
||||
|
@ -237,11 +238,11 @@ class Status:
|
|||
visibility: str
|
||||
sensitive: bool
|
||||
spoiler_text: str
|
||||
media_attachments: List[MediaAttachment]
|
||||
media_attachments: t.List[MediaAttachment]
|
||||
application: Optional[Application]
|
||||
mentions: List[StatusMention]
|
||||
tags: List[StatusTag]
|
||||
emojis: List[CustomEmoji]
|
||||
mentions: t.List[StatusMention]
|
||||
tags: t.List[StatusTag]
|
||||
emojis: t.List[CustomEmoji]
|
||||
reblogs_count: int
|
||||
favourites_count: int
|
||||
replies_count: int
|
||||
|
@ -259,7 +260,7 @@ class Status:
|
|||
muted: Optional[bool]
|
||||
bookmarked: Optional[bool]
|
||||
pinned: Optional[bool]
|
||||
filtered: Optional[List[FilterResult]]
|
||||
filtered: Optional[t.List[FilterResult]]
|
||||
|
||||
@property
|
||||
def original(self) -> "Status":
|
||||
|
@ -289,8 +290,8 @@ class Report:
|
|||
comment: str
|
||||
forwarded: bool
|
||||
created_at: datetime
|
||||
status_ids: Optional[List[str]]
|
||||
rule_ids: Optional[List[str]]
|
||||
status_ids: Optional[t.List[str]]
|
||||
rule_ids: Optional[t.List[str]]
|
||||
target_account: Account
|
||||
|
||||
|
||||
|
@ -328,7 +329,7 @@ class InstanceConfigurationStatuses:
|
|||
|
||||
@dataclass
|
||||
class InstanceConfigurationMediaAttachments:
|
||||
supported_mime_types: List[str]
|
||||
supported_mime_types: t.List[str]
|
||||
image_size_limit: int
|
||||
image_matrix_limit: int
|
||||
video_size_limit: int
|
||||
|
@ -377,13 +378,13 @@ class Instance:
|
|||
urls: InstanceUrls
|
||||
stats: InstanceStats
|
||||
thumbnail: Optional[str]
|
||||
languages: List[str]
|
||||
languages: t.List[str]
|
||||
registrations: bool
|
||||
approval_required: bool
|
||||
invites_enabled: bool
|
||||
configuration: InstanceConfiguration
|
||||
contact_account: Optional[Account]
|
||||
rules: List[Rule]
|
||||
rules: t.List[Rule]
|
||||
|
||||
|
||||
@dataclass
|
||||
|
@ -397,7 +398,7 @@ class Relationship:
|
|||
following: bool
|
||||
showing_reblogs: bool
|
||||
notifying: bool
|
||||
languages: List[str]
|
||||
languages: t.List[str]
|
||||
followed_by: bool
|
||||
blocking: bool
|
||||
blocked_by: bool
|
||||
|
@ -428,7 +429,7 @@ class Tag:
|
|||
"""
|
||||
name: str
|
||||
url: str
|
||||
history: List[TagHistory]
|
||||
history: t.List[TagHistory]
|
||||
following: Optional[bool]
|
||||
|
||||
|
||||
|
@ -445,6 +446,19 @@ class FeaturedTag:
|
|||
last_status_at: datetime
|
||||
|
||||
|
||||
@dataclass
|
||||
class List:
|
||||
"""
|
||||
Represents a list of some users that the authenticated user follows.
|
||||
https://docs.joinmastodon.org/entities/List/
|
||||
"""
|
||||
id: str
|
||||
title: str
|
||||
# This is a required field on Mastodon, but not supported on Pleroma/Akkoma
|
||||
# see: https://git.pleroma.social/pleroma/pleroma/-/issues/2918
|
||||
replies_policy: Optional[str]
|
||||
|
||||
|
||||
# Generic data class instance
|
||||
T = TypeVar("T")
|
||||
|
||||
|
@ -481,7 +495,7 @@ def from_dict(cls: Type[T], data: Dict) -> T:
|
|||
|
||||
|
||||
@lru_cache(maxsize=100)
|
||||
def get_fields(cls: Type) -> List[Tuple[str, Type, Any]]:
|
||||
def get_fields(cls: Type) -> t.List[Tuple[str, Type, Any]]:
|
||||
hints = get_type_hints(cls)
|
||||
return [
|
||||
(
|
||||
|
@ -493,7 +507,7 @@ def get_fields(cls: Type) -> List[Tuple[str, Type, Any]]:
|
|||
]
|
||||
|
||||
|
||||
def from_dict_list(cls: Type[T], data: List[Dict]) -> List[T]:
|
||||
def from_dict_list(cls: Type[T], data: t.List[Dict]) -> t.List[T]:
|
||||
return [from_dict(cls, x) for x in data]
|
||||
|
||||
|
||||
|
|
|
@ -1,12 +1,12 @@
|
|||
import click
|
||||
import re
|
||||
import textwrap
|
||||
import shutil
|
||||
import textwrap
|
||||
import typing as t
|
||||
|
||||
from toot.entities import Account, Instance, Notification, Poll, Status
|
||||
from toot.entities import Account, Instance, Notification, Poll, Status, List
|
||||
from toot.utils import get_text, html_to_paragraphs
|
||||
from toot.wcstring import wc_wrap
|
||||
from typing import Any, Generator, Iterable, List
|
||||
from wcwidth import wcswidth
|
||||
|
||||
|
||||
|
@ -38,7 +38,7 @@ def instance_to_text(instance: Instance, width: int) -> str:
|
|||
return "\n".join(instance_lines(instance, width))
|
||||
|
||||
|
||||
def instance_lines(instance: Instance, width: int) -> Generator[str, None, None]:
|
||||
def instance_lines(instance: Instance, width: int) -> t.Generator[str, None, None]:
|
||||
yield f"{green(instance.title)}"
|
||||
yield f"{blue(instance.uri)}"
|
||||
yield f"running Mastodon {instance.version}"
|
||||
|
@ -78,7 +78,7 @@ def account_to_text(account: Account, width: int) -> str:
|
|||
return "\n".join(account_lines(account, width))
|
||||
|
||||
|
||||
def account_lines(account: Account, width: int) -> Generator[str, None, None]:
|
||||
def account_lines(account: Account, width: int) -> t.Generator[str, None, None]:
|
||||
acct = f"@{account.acct}"
|
||||
since = account.created_at.strftime("%Y-%m-%d")
|
||||
|
||||
|
@ -119,13 +119,13 @@ def print_tag_list(tags):
|
|||
click.echo(f"* {format_tag_name(tag)}\t{tag['url']}")
|
||||
|
||||
|
||||
def print_lists(lists):
|
||||
def print_lists(lists: t.List[List]):
|
||||
headers = ["ID", "Title", "Replies"]
|
||||
data = [[lst["id"], lst["title"], lst["replies_policy"]] for lst in lists]
|
||||
data = [[lst.id, lst.title, lst.replies_policy or ""] for lst in lists]
|
||||
print_table(headers, data)
|
||||
|
||||
|
||||
def print_table(headers: List[str], data: List[List[str]]):
|
||||
def print_table(headers: t.List[str], data: t.List[t.List[str]]):
|
||||
widths = [[len(cell) for cell in row] for row in data + [headers]]
|
||||
widths = [max(width) for width in zip(*widths)]
|
||||
|
||||
|
@ -178,7 +178,7 @@ def status_to_text(status: Status, width: int) -> str:
|
|||
return "\n".join(status_lines(status))
|
||||
|
||||
|
||||
def status_lines(status: Status) -> Generator[str, None, None]:
|
||||
def status_lines(status: Status) -> t.Generator[str, None, None]:
|
||||
width = get_width()
|
||||
status_id = status.id
|
||||
in_reply_to_id = status.in_reply_to_id
|
||||
|
@ -219,10 +219,10 @@ def status_lines(status: Status) -> Generator[str, None, None]:
|
|||
|
||||
reply = f"↲ In reply to {yellow(in_reply_to_id)} " if in_reply_to_id else ""
|
||||
boost = f"↻ {blue(reblogged_by_acct)} boosted " if reblogged_by else ""
|
||||
yield f"ID {yellow(status_id)} {reply} {boost}"
|
||||
yield f"ID {yellow(status_id)} Visibility: {status.visibility} {reply} {boost}"
|
||||
|
||||
|
||||
def html_lines(html: str, width: int) -> Generator[str, None, None]:
|
||||
def html_lines(html: str, width: int) -> t.Generator[str, None, None]:
|
||||
first = True
|
||||
for paragraph in html_to_paragraphs(html):
|
||||
if not first:
|
||||
|
@ -233,7 +233,7 @@ def html_lines(html: str, width: int) -> Generator[str, None, None]:
|
|||
first = False
|
||||
|
||||
|
||||
def poll_lines(poll: Poll) -> Generator[str, None, None]:
|
||||
def poll_lines(poll: Poll) -> t.Generator[str, None, None]:
|
||||
for idx, option in enumerate(poll.options):
|
||||
perc = (round(100 * option.votes_count / poll.votes_count)
|
||||
if poll.votes_count and option.votes_count is not None else 0)
|
||||
|
@ -258,7 +258,7 @@ def poll_lines(poll: Poll) -> Generator[str, None, None]:
|
|||
yield poll_footer
|
||||
|
||||
|
||||
def print_timeline(items: Iterable[Status]):
|
||||
def print_timeline(items: t.Iterable[Status]):
|
||||
print_divider()
|
||||
for item in items:
|
||||
print_status(item)
|
||||
|
@ -272,7 +272,7 @@ def print_notification(notification: Notification):
|
|||
print_status(notification.status)
|
||||
|
||||
|
||||
def print_notifications(notifications: List[Notification]):
|
||||
def print_notifications(notifications: t.List[Notification]):
|
||||
for notification in notifications:
|
||||
if notification.type not in ['pleroma:emoji_reaction']:
|
||||
print_divider()
|
||||
|
@ -316,25 +316,25 @@ def format_account_name(account: Account) -> str:
|
|||
|
||||
# Shorthand functions for coloring output
|
||||
|
||||
def blue(text: Any) -> str:
|
||||
def blue(text: t.Any) -> str:
|
||||
return click.style(text, fg="blue")
|
||||
|
||||
|
||||
def bold(text: Any) -> str:
|
||||
def bold(text: t.Any) -> str:
|
||||
return click.style(text, bold=True)
|
||||
|
||||
|
||||
def cyan(text: Any) -> str:
|
||||
def cyan(text: t.Any) -> str:
|
||||
return click.style(text, fg="cyan")
|
||||
|
||||
|
||||
def dim(text: Any) -> str:
|
||||
def dim(text: t.Any) -> str:
|
||||
return click.style(text, dim=True)
|
||||
|
||||
|
||||
def green(text: Any) -> str:
|
||||
def green(text: t.Any) -> str:
|
||||
return click.style(text, fg="green")
|
||||
|
||||
|
||||
def yellow(text: Any) -> str:
|
||||
def yellow(text: t.Any) -> str:
|
||||
return click.style(text, fg="yellow")
|
||||
|
|
114
toot/tui/app.py
114
toot/tui/app.py
|
@ -2,6 +2,7 @@ import logging
|
|||
import subprocess
|
||||
import urwid
|
||||
|
||||
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from typing import NamedTuple, Optional
|
||||
from datetime import datetime, timezone
|
||||
|
@ -15,11 +16,12 @@ from toot.utils.datetime import parse_datetime
|
|||
from .compose import StatusComposer
|
||||
from .constants import PALETTE
|
||||
from .entities import Status
|
||||
from .images import TuiScreen, load_image
|
||||
from .overlays import ExceptionStackTrace, GotoMenu, Help, StatusSource, StatusLinks, StatusZoom
|
||||
from .overlays import StatusDeleteConfirmation, Account
|
||||
from .poll import Poll
|
||||
from .timeline import Timeline
|
||||
from .utils import get_max_toot_chars, parse_content_links, copy_to_clipboard
|
||||
from .utils import get_max_toot_chars, parse_content_links, copy_to_clipboard, LRUCache
|
||||
from .widgets import ModalBox, RoundedLineBox
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
@ -35,7 +37,9 @@ class TuiOptions(NamedTuple):
|
|||
media_viewer: Optional[str]
|
||||
always_show_sensitive: bool
|
||||
relative_datetimes: bool
|
||||
default_visibility: Optional[bool]
|
||||
cache_size: int
|
||||
default_visibility: Optional[str]
|
||||
image_format: Optional[str]
|
||||
|
||||
|
||||
class Header(urwid.WidgetWrap):
|
||||
|
@ -62,9 +66,13 @@ class Header(urwid.WidgetWrap):
|
|||
|
||||
|
||||
class Footer(urwid.Pile):
|
||||
def __init__(self):
|
||||
def __init__(self, tui):
|
||||
self.tui = tui
|
||||
self.status = urwid.Text("")
|
||||
self.message = urwid.Text("")
|
||||
self.command = Command(tui)
|
||||
|
||||
urwid.connect_signal(self.command, "close", self.end_command)
|
||||
|
||||
return super().__init__([
|
||||
urwid.AttrMap(self.status, "footer_status"),
|
||||
|
@ -86,6 +94,60 @@ class Footer(urwid.Pile):
|
|||
def clear_message(self):
|
||||
self.message.set_text("")
|
||||
|
||||
def start_command(self):
|
||||
self.clear_message()
|
||||
self.command.set_edit_text("")
|
||||
self.contents[1] = (self.command, ("weight", 1))
|
||||
self.focus_position = 1
|
||||
|
||||
def end_command(self, widget, success, message):
|
||||
self.contents[1] = (self.message, ("weight", 1))
|
||||
self.tui.focus_body()
|
||||
|
||||
if message:
|
||||
if success:
|
||||
self.set_message(message)
|
||||
else:
|
||||
self.set_error_message(message)
|
||||
|
||||
|
||||
class Command(urwid.Edit):
|
||||
"""Allows execution of vim-like commands in the footer"""
|
||||
signals = ["close"]
|
||||
|
||||
tui: "TUI"
|
||||
|
||||
def __init__(self, tui):
|
||||
self.tui = tui
|
||||
super().__init__(":")
|
||||
|
||||
def keypress(self, size, key):
|
||||
logger.debug((size, key))
|
||||
|
||||
if key == "enter":
|
||||
self.run_command()
|
||||
|
||||
if key == "esc":
|
||||
self.close()
|
||||
|
||||
return super().keypress(size, key)
|
||||
|
||||
def close(self, success=True, message=None):
|
||||
self._emit("close", success, message)
|
||||
|
||||
def run_command(self):
|
||||
command = self.get_edit_text()
|
||||
|
||||
if command in ("q", "quit"):
|
||||
raise urwid.ExitMainLoop()
|
||||
|
||||
elif command in ("h", "help"):
|
||||
self.tui.show_help()
|
||||
self.close()
|
||||
|
||||
else:
|
||||
self.close(False, f"Unknown command: {command}")
|
||||
|
||||
|
||||
class TUI(urwid.Frame):
|
||||
"""Main TUI frame."""
|
||||
|
@ -95,7 +157,7 @@ class TUI(urwid.Frame):
|
|||
@staticmethod
|
||||
def create(app: App, user: User, args: TuiOptions):
|
||||
"""Factory method, sets up TUI and an event loop."""
|
||||
screen = urwid.raw_display.Screen()
|
||||
screen = TuiScreen()
|
||||
screen.set_terminal_properties(args.colors)
|
||||
|
||||
tui = TUI(app, user, screen, args)
|
||||
|
@ -130,7 +192,7 @@ class TUI(urwid.Frame):
|
|||
# Show intro screen while toots are being loaded
|
||||
self.body = self.build_intro()
|
||||
self.header = Header(app, user)
|
||||
self.footer = Footer()
|
||||
self.footer = Footer(self)
|
||||
self.footer.set_status("Loading...")
|
||||
|
||||
# Default max status length, updated on startup
|
||||
|
@ -144,6 +206,11 @@ class TUI(urwid.Frame):
|
|||
self.followed_accounts = []
|
||||
self.preferences = {}
|
||||
|
||||
if self.options.cache_size:
|
||||
self.cache_max = 1024 * 1024 * self.options.cache_size
|
||||
else:
|
||||
self.cache_max = 1024 * 1024 * 10 # default 10MB
|
||||
|
||||
super().__init__(self.body, header=self.header, footer=self.footer)
|
||||
|
||||
def run(self):
|
||||
|
@ -327,8 +394,10 @@ class TUI(urwid.Frame):
|
|||
# get the major version number of the server
|
||||
# this works for Mastodon and Pleroma version strings
|
||||
# Mastodon versions < 4 do not have translation service
|
||||
# If the version is missing, assume 0 as a fallback
|
||||
# Revisit this logic if Pleroma implements translation
|
||||
ch = instance["version"][0]
|
||||
version = instance["version"]
|
||||
ch = "0" if not version else version[0]
|
||||
self.can_translate = int(ch) > 3 if ch.isnumeric() else False
|
||||
|
||||
return self.run_in_thread(_load_instance, done_callback=_done)
|
||||
|
@ -646,7 +715,7 @@ class TUI(urwid.Frame):
|
|||
account = api.whois(self.app, self.user, account_id)
|
||||
relationship = api.get_relationship(self.app, self.user, account_id)
|
||||
self.open_overlay(
|
||||
widget=Account(self.app, self.user, account, relationship),
|
||||
widget=Account(self.app, self.user, account, relationship, self.options),
|
||||
title="Account",
|
||||
)
|
||||
|
||||
|
@ -755,6 +824,27 @@ class TUI(urwid.Frame):
|
|||
|
||||
return self.run_in_thread(_delete, done_callback=_done)
|
||||
|
||||
def async_load_image(self, timeline, status, path, placeholder_index):
|
||||
def _load():
|
||||
# don't bother loading images for statuses we are not viewing now
|
||||
if timeline.get_focused_status().id != status.id:
|
||||
return
|
||||
|
||||
if not hasattr(timeline, "images"):
|
||||
timeline.images = LRUCache(cache_max_bytes=self.cache_max)
|
||||
|
||||
img = load_image(path)
|
||||
if img:
|
||||
timeline.images[str(hash(path))] = img
|
||||
|
||||
def _done(loop):
|
||||
# don't bother loading images for statuses we are not viewing now
|
||||
if timeline.get_focused_status().id != status.id:
|
||||
return
|
||||
timeline.update_status_image(status, path, placeholder_index)
|
||||
|
||||
return self.run_in_thread(_load, done_callback=_done)
|
||||
|
||||
def copy_status(self, status):
|
||||
# TODO: copy a better version of status content
|
||||
# including URLs
|
||||
|
@ -820,6 +910,12 @@ class TUI(urwid.Frame):
|
|||
|
||||
self.async_load_timeline(is_initial=True, timeline_name=self.timeline.name)
|
||||
|
||||
def focus_footer(self):
|
||||
self.focus_part = "footer"
|
||||
|
||||
def focus_body(self):
|
||||
self.focus_part = "body"
|
||||
|
||||
# --- Keys -----------------------------------------------------------------
|
||||
|
||||
def unhandled_input(self, key):
|
||||
|
@ -854,3 +950,7 @@ class TUI(urwid.Frame):
|
|||
self.close_overlay()
|
||||
else:
|
||||
raise urwid.ExitMainLoop()
|
||||
|
||||
elif key == ":" and not self.overlay:
|
||||
self.focus_footer()
|
||||
self.footer.start_command()
|
||||
|
|
|
@ -53,7 +53,7 @@ class Status:
|
|||
self.id = self.data["id"]
|
||||
self.account = self._get_account()
|
||||
self.created_at = parse_datetime(data["created_at"])
|
||||
if data["edited_at"]:
|
||||
if data.get("edited_at"):
|
||||
self.edited_at = parse_datetime(data["edited_at"])
|
||||
else:
|
||||
self.edited_at = None
|
||||
|
|
104
toot/tui/images.py
Normal file
104
toot/tui/images.py
Normal file
|
@ -0,0 +1,104 @@
|
|||
import urwid
|
||||
import math
|
||||
import requests
|
||||
import warnings
|
||||
|
||||
# If term_image is loaded use their screen implementation which handles images
|
||||
try:
|
||||
from term_image.widget import UrwidImageScreen, UrwidImage
|
||||
from term_image.image import BaseImage, KittyImage, ITerm2Image, BlockImage
|
||||
from term_image import disable_queries # prevent phantom keystrokes
|
||||
from PIL import Image, ImageDraw
|
||||
|
||||
TuiScreen = UrwidImageScreen
|
||||
disable_queries()
|
||||
|
||||
def image_support_enabled():
|
||||
return True
|
||||
|
||||
def can_render_pixels(image_format):
|
||||
return image_format in ['kitty', 'iterm']
|
||||
|
||||
def get_base_image(image, image_format) -> BaseImage:
|
||||
# we don't autodetect kitty, iterm; we choose based on option switches
|
||||
BaseImage.forced_support = True
|
||||
if image_format == 'kitty':
|
||||
return KittyImage(image)
|
||||
elif image_format == 'iterm':
|
||||
return ITerm2Image(image)
|
||||
else:
|
||||
return BlockImage(image)
|
||||
|
||||
def resize_image(basewidth: int, baseheight: int, img: Image.Image) -> Image.Image:
|
||||
if baseheight and not basewidth:
|
||||
hpercent = baseheight / float(img.size[1])
|
||||
width = math.ceil(img.size[0] * hpercent)
|
||||
img = img.resize((width, baseheight), Image.Resampling.LANCZOS)
|
||||
elif basewidth and not baseheight:
|
||||
wpercent = (basewidth / float(img.size[0]))
|
||||
hsize = int((float(img.size[1]) * float(wpercent)))
|
||||
img = img.resize((basewidth, hsize), Image.Resampling.LANCZOS)
|
||||
else:
|
||||
img = img.resize((basewidth, baseheight), Image.Resampling.LANCZOS)
|
||||
|
||||
if img.mode != 'P':
|
||||
img = img.convert('RGB')
|
||||
return img
|
||||
|
||||
def add_corners(img, rad):
|
||||
circle = Image.new('L', (rad * 2, rad * 2), 0)
|
||||
draw = ImageDraw.Draw(circle)
|
||||
draw.ellipse((0, 0, rad * 2, rad * 2), fill=255)
|
||||
alpha = Image.new('L', img.size, "white")
|
||||
w, h = img.size
|
||||
alpha.paste(circle.crop((0, 0, rad, rad)), (0, 0))
|
||||
alpha.paste(circle.crop((0, rad, rad, rad * 2)), (0, h - rad))
|
||||
alpha.paste(circle.crop((rad, 0, rad * 2, rad)), (w - rad, 0))
|
||||
alpha.paste(circle.crop((rad, rad, rad * 2, rad * 2)), (w - rad, h - rad))
|
||||
img.putalpha(alpha)
|
||||
return img
|
||||
|
||||
def load_image(url):
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore") # suppress "corrupt exif" output from PIL
|
||||
try:
|
||||
img = Image.open(requests.get(url, stream=True).raw)
|
||||
if img.format == 'PNG' and img.mode != 'RGBA':
|
||||
img = img.convert("RGBA")
|
||||
return img
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def graphics_widget(img, image_format="block", corner_radius=0) -> urwid.Widget:
|
||||
if not img:
|
||||
return urwid.SolidFill(fill_char=" ")
|
||||
|
||||
if can_render_pixels(image_format) and corner_radius > 0:
|
||||
render_img = add_corners(img, 10)
|
||||
else:
|
||||
render_img = img
|
||||
|
||||
return UrwidImage(get_base_image(render_img, image_format), '<', upscale=True)
|
||||
# "<" means left-justify the image
|
||||
|
||||
except ImportError:
|
||||
from urwid.raw_display import Screen
|
||||
TuiScreen = Screen
|
||||
|
||||
def image_support_enabled():
|
||||
return False
|
||||
|
||||
def can_render_pixels(image_format: str):
|
||||
return False
|
||||
|
||||
def get_base_image(image, image_format: str):
|
||||
return None
|
||||
|
||||
def add_corners(img, rad):
|
||||
return None
|
||||
|
||||
def load_image(url):
|
||||
return None
|
||||
|
||||
def graphics_widget(img, image_format="block", corner_radius=0) -> urwid.Widget:
|
||||
return urwid.SolidFill(fill_char=" ")
|
|
@ -5,7 +5,9 @@ import webbrowser
|
|||
|
||||
from toot import __version__
|
||||
from toot import api
|
||||
|
||||
from toot.tui.utils import highlight_keys
|
||||
from toot.tui.images import image_support_enabled, load_image, graphics_widget
|
||||
from toot.tui.widgets import Button, EditBox, SelectableText
|
||||
from toot.tui.richtext import html_to_widgets
|
||||
|
||||
|
@ -242,11 +244,12 @@ class Help(urwid.Padding):
|
|||
|
||||
class Account(urwid.ListBox):
|
||||
"""Shows account data and provides various actions"""
|
||||
def __init__(self, app, user, account, relationship):
|
||||
def __init__(self, app, user, account, relationship, options):
|
||||
self.app = app
|
||||
self.user = user
|
||||
self.account = account
|
||||
self.relationship = relationship
|
||||
self.options = options
|
||||
self.last_action = None
|
||||
self.setup_listbox()
|
||||
|
||||
|
@ -255,6 +258,30 @@ class Account(urwid.ListBox):
|
|||
walker = urwid.SimpleListWalker(actions)
|
||||
super().__init__(walker)
|
||||
|
||||
def account_header(self, account):
|
||||
if image_support_enabled() and account['avatar'] and not account["avatar"].endswith("missing.png"):
|
||||
img = load_image(account['avatar'])
|
||||
aimg = urwid.BoxAdapter(
|
||||
graphics_widget(img, image_format=self.options.image_format, corner_radius=10), 10)
|
||||
else:
|
||||
aimg = urwid.BoxAdapter(urwid.SolidFill(" "), 10)
|
||||
|
||||
if image_support_enabled() and account['header'] and not account["header"].endswith("missing.png"):
|
||||
img = load_image(account['header'])
|
||||
|
||||
himg = (urwid.BoxAdapter(
|
||||
graphics_widget(img, image_format=self.options.image_format, corner_radius=10), 10))
|
||||
else:
|
||||
himg = urwid.BoxAdapter(urwid.SolidFill(" "), 10)
|
||||
|
||||
atxt = urwid.Pile([urwid.Divider(),
|
||||
(urwid.Text(("account", account["display_name"]))),
|
||||
(urwid.Text(("highlight", "@" + self.account['acct'])))])
|
||||
columns = urwid.Columns([aimg, ("weight", 9999, himg)], dividechars=2, min_width=20)
|
||||
|
||||
header = urwid.Pile([columns, urwid.Divider(), atxt])
|
||||
return header
|
||||
|
||||
def generate_contents(self, account, relationship=None, last_action=None):
|
||||
if self.last_action and not self.last_action.startswith("Confirm"):
|
||||
yield Button(f"Confirm {self.last_action}", on_press=take_action, user_data=self)
|
||||
|
@ -276,11 +303,11 @@ class Account(urwid.ListBox):
|
|||
|
||||
yield urwid.Divider("─")
|
||||
yield urwid.Divider()
|
||||
yield urwid.Text([("account", f"@{account['acct']}"), f" {account['display_name']}"])
|
||||
|
||||
yield self.account_header(account)
|
||||
|
||||
if account["note"]:
|
||||
yield urwid.Divider()
|
||||
|
||||
widgetlist = html_to_widgets(account["note"])
|
||||
for line in widgetlist:
|
||||
yield (line)
|
||||
|
|
|
@ -60,7 +60,10 @@ def html_to_widgets(html, recovery_attempt=False) -> List[urwid.Widget]:
|
|||
|
||||
|
||||
def url_to_widget(url: str):
|
||||
try:
|
||||
widget = len(url), urwid.Filler(Hyperlink(url, "link", url))
|
||||
except ValueError:
|
||||
widget = len(url), urwid.Filler(urwid.Text(url)) # don't style as link
|
||||
return TextEmbed(widget)
|
||||
|
||||
|
||||
|
@ -98,10 +101,16 @@ def text_to_widget(attr, markup) -> urwid.Widget:
|
|||
if match:
|
||||
label, url = match.groups()
|
||||
anchor_attr = get_best_anchor_attr(attr_list)
|
||||
try:
|
||||
markup_list.append((
|
||||
len(label),
|
||||
urwid.Filler(Hyperlink(url, anchor_attr, label)),
|
||||
))
|
||||
except ValueError:
|
||||
markup_list.append((
|
||||
len(label),
|
||||
urwid.Filler(urwid.Text(url)), # don't style as link
|
||||
))
|
||||
else:
|
||||
markup_list.append(run)
|
||||
else:
|
||||
|
|
|
@ -1,26 +1,33 @@
|
|||
import logging
|
||||
import math
|
||||
import urwid
|
||||
import webbrowser
|
||||
|
||||
from typing import List, Optional
|
||||
|
||||
from toot.tui import app
|
||||
|
||||
from toot.tui.richtext import html_to_widgets, url_to_widget
|
||||
from toot.utils.datetime import parse_datetime, time_ago
|
||||
from toot.utils.language import language_name
|
||||
|
||||
from toot.entities import Status
|
||||
from toot.tui.scroll import Scrollable, ScrollBar
|
||||
|
||||
from toot.tui.utils import highlight_keys
|
||||
from toot.tui.images import image_support_enabled, graphics_widget, can_render_pixels
|
||||
from toot.tui.widgets import SelectableText, SelectableColumns, RoundedLineBox
|
||||
|
||||
|
||||
logger = logging.getLogger("toot")
|
||||
screen = urwid.raw_display.Screen()
|
||||
|
||||
|
||||
class Timeline(urwid.Columns):
|
||||
"""
|
||||
Displays a list of statuses to the left, and status details on the right.
|
||||
"""
|
||||
|
||||
signals = [
|
||||
"close", # Close thread
|
||||
"focus", # Focus changed
|
||||
|
@ -41,6 +48,7 @@ class Timeline(urwid.Columns):
|
|||
self.is_thread = is_thread
|
||||
self.statuses = statuses
|
||||
self.status_list = self.build_status_list(statuses, focus=focus)
|
||||
self.can_render_pixels = can_render_pixels(self.tui.options.image_format)
|
||||
|
||||
try:
|
||||
focused_status = statuses[focus]
|
||||
|
@ -141,6 +149,16 @@ class Timeline(urwid.Columns):
|
|||
def modified(self):
|
||||
"""Called when the list focus switches to a new status"""
|
||||
status, index, count = self.get_focused_status_with_counts()
|
||||
|
||||
if image_support_enabled:
|
||||
clear_op = getattr(self.tui.screen, "clear_images", None)
|
||||
# term-image's screen implementation has clear_images(),
|
||||
# urwid's implementation does not.
|
||||
# TODO: it would be nice not to check this each time thru
|
||||
|
||||
if callable(clear_op):
|
||||
self.tui.screen.clear_images()
|
||||
|
||||
self.draw_status_details(status)
|
||||
self._emit("focus")
|
||||
|
||||
|
@ -282,7 +300,7 @@ class Timeline(urwid.Columns):
|
|||
|
||||
def get_status_index(self, id):
|
||||
# TODO: This is suboptimal, consider a better way
|
||||
for n, status in enumerate(self.statuses):
|
||||
for n, status in enumerate(self.statuses.copy()):
|
||||
if status.id == id:
|
||||
return n
|
||||
raise ValueError("Status with ID {} not found".format(id))
|
||||
|
@ -306,6 +324,27 @@ class Timeline(urwid.Columns):
|
|||
if index == self.status_list.body.focus:
|
||||
self.draw_status_details(status)
|
||||
|
||||
def update_status_image(self, status, path, placeholder_index):
|
||||
"""Replace image placeholder with image widget and redraw"""
|
||||
index = self.get_status_index(status.id)
|
||||
assert self.statuses[index].id == status.id # Sanity check
|
||||
|
||||
# get the image and replace the placeholder with a graphics widget
|
||||
img = None
|
||||
if hasattr(self, "images"):
|
||||
try:
|
||||
img = self.images[(str(hash(path)))]
|
||||
except KeyError:
|
||||
pass
|
||||
if img:
|
||||
try:
|
||||
status.placeholders[placeholder_index]._set_original_widget(
|
||||
graphics_widget(img, image_format=self.tui.options.image_format, corner_radius=10))
|
||||
|
||||
except IndexError:
|
||||
# ignore IndexErrors.
|
||||
pass
|
||||
|
||||
def remove_status(self, status):
|
||||
index = self.get_status_index(status.id)
|
||||
assert self.statuses[index].id == status.id # Sanity check
|
||||
|
@ -318,6 +357,9 @@ class Timeline(urwid.Columns):
|
|||
class StatusDetails(urwid.Pile):
|
||||
def __init__(self, timeline: Timeline, status: Optional[Status]):
|
||||
self.status = status
|
||||
self.timeline = timeline
|
||||
if self.status:
|
||||
self.status.placeholders = []
|
||||
self.followed_accounts = timeline.tui.followed_accounts
|
||||
self.options = timeline.tui.options
|
||||
|
||||
|
@ -326,17 +368,83 @@ class StatusDetails(urwid.Pile):
|
|||
if status else ())
|
||||
return super().__init__(widget_list)
|
||||
|
||||
def image_widget(self, path, rows=None, aspect=None) -> urwid.Widget:
|
||||
"""Returns a widget capable of displaying the image
|
||||
|
||||
path is required; URL to image
|
||||
rows, if specfied, sets a fixed number of rows. Or:
|
||||
aspect, if specified, calculates rows based on pane width
|
||||
and the aspect ratio provided"""
|
||||
|
||||
if not rows:
|
||||
if not aspect:
|
||||
aspect = 3 / 2 # reasonable default
|
||||
|
||||
screen_rows = screen.get_cols_rows()[1]
|
||||
if self.timeline.can_render_pixels:
|
||||
# for pixel-rendered images,
|
||||
# image rows should be 33% of the available screen
|
||||
# but in no case fewer than 10
|
||||
rows = max(10, math.floor(screen_rows * .33))
|
||||
else:
|
||||
# for cell-rendered images,
|
||||
# use the max available columns
|
||||
# and calculate rows based on the image
|
||||
# aspect ratio
|
||||
cols = math.floor(0.55 * screen.get_cols_rows()[0])
|
||||
rows = math.ceil((cols / 2) / aspect)
|
||||
# if the calculated rows are more than will
|
||||
# fit on one screen, reduce to one screen of rows
|
||||
rows = min(screen_rows - 6, rows)
|
||||
|
||||
# but in no case fewer than 10 rows
|
||||
rows = max(rows, 10)
|
||||
|
||||
img = None
|
||||
if hasattr(self.timeline, "images"):
|
||||
try:
|
||||
img = self.timeline.images[(str(hash(path)))]
|
||||
except KeyError:
|
||||
pass
|
||||
if img:
|
||||
return (urwid.BoxAdapter(
|
||||
graphics_widget(img, image_format=self.timeline.tui.options.image_format, corner_radius=10), rows))
|
||||
else:
|
||||
placeholder = urwid.BoxAdapter(urwid.SolidFill(fill_char=" "), rows)
|
||||
self.status.placeholders.append(placeholder)
|
||||
if image_support_enabled():
|
||||
self.timeline.tui.async_load_image(self.timeline, self.status, path, len(self.status.placeholders) - 1)
|
||||
return placeholder
|
||||
|
||||
def author_header(self, reblogged_by):
|
||||
avatar_url = self.status.original.data["account"]["avatar"]
|
||||
|
||||
if avatar_url and image_support_enabled():
|
||||
aimg = self.image_widget(avatar_url, 2)
|
||||
|
||||
account_color = ("highlight" if self.status.original.author.account in
|
||||
self.timeline.tui.followed_accounts else "account")
|
||||
|
||||
atxt = urwid.Pile([("pack", urwid.Text(("bold", self.status.original.author.display_name))),
|
||||
("pack", urwid.Text((account_color, self.status.original.author.account)))])
|
||||
|
||||
if image_support_enabled():
|
||||
columns = urwid.Columns([aimg, ("weight", 9999, atxt)], dividechars=1, min_width=5)
|
||||
else:
|
||||
columns = urwid.Columns([("weight", 9999, atxt)], dividechars=1, min_width=5)
|
||||
|
||||
return columns
|
||||
|
||||
def content_generator(self, status, reblogged_by):
|
||||
if reblogged_by:
|
||||
text = "♺ {} boosted".format(reblogged_by.display_name or reblogged_by.username)
|
||||
yield ("pack", urwid.Text(("dim", text)))
|
||||
reblogger_name = (reblogged_by.display_name
|
||||
if reblogged_by.display_name
|
||||
else reblogged_by.username)
|
||||
text = f"♺ {reblogger_name} boosted"
|
||||
yield urwid.Text(("dim", text))
|
||||
yield ("pack", urwid.AttrMap(urwid.Divider("-"), "dim"))
|
||||
|
||||
if status.author.display_name:
|
||||
yield ("pack", urwid.Text(("bold", status.author.display_name)))
|
||||
|
||||
account_color = "highlight" if status.author.account in self.followed_accounts else "account"
|
||||
yield ("pack", urwid.Text((account_color, status.author.account)))
|
||||
yield self.author_header(reblogged_by)
|
||||
yield ("pack", urwid.Divider())
|
||||
|
||||
if status.data["spoiler_text"]:
|
||||
|
@ -363,6 +471,26 @@ class StatusDetails(urwid.Pile):
|
|||
yield ("pack", urwid.Text([("bold", "Media attachment"), " (", m["type"], ")"]))
|
||||
if m["description"]:
|
||||
yield ("pack", urwid.Text(m["description"]))
|
||||
if m["url"]:
|
||||
if m["url"].lower().endswith(('.jpg', '.jpeg', '.png', '.gif', '.svg', '.webp')):
|
||||
yield urwid.Text("")
|
||||
try:
|
||||
aspect = float(m["meta"]["original"]["aspect"])
|
||||
except Exception:
|
||||
aspect = None
|
||||
if image_support_enabled():
|
||||
yield self.image_widget(m["url"], aspect=aspect)
|
||||
yield urwid.Divider()
|
||||
# video media may include a preview URL, show that as a fallback
|
||||
elif m["preview_url"].lower().endswith(('.jpg', '.jpeg', '.png', '.gif', '.svg', '.webp')):
|
||||
yield urwid.Text("")
|
||||
try:
|
||||
aspect = float(m["meta"]["small"]["aspect"])
|
||||
except Exception:
|
||||
aspect = None
|
||||
if image_support_enabled():
|
||||
yield self.image_widget(m["preview_url"], aspect=aspect)
|
||||
yield urwid.Divider()
|
||||
yield ("pack", url_to_widget(m["url"]))
|
||||
|
||||
poll = status.original.data.get("poll")
|
||||
|
@ -427,6 +555,15 @@ class StatusDetails(urwid.Pile):
|
|||
yield urwid.Text("")
|
||||
yield url_to_widget(card["url"])
|
||||
|
||||
if card["image"] and image_support_enabled():
|
||||
if card["image"].lower().endswith(('.jpg', '.jpeg', '.png', '.gif', '.svg', '.webp')):
|
||||
yield urwid.Text("")
|
||||
try:
|
||||
aspect = int(card["width"]) / int(card["height"])
|
||||
except Exception:
|
||||
aspect = None
|
||||
yield self.image_widget(card["image"], aspect=aspect)
|
||||
|
||||
def poll_generator(self, poll):
|
||||
for idx, option in enumerate(poll["options"]):
|
||||
perc = (round(100 * option["votes_count"] / poll["votes_count"])
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
import base64
|
||||
import re
|
||||
import sys
|
||||
import urwid
|
||||
|
||||
from collections import OrderedDict
|
||||
from functools import reduce
|
||||
from html.parser import HTMLParser
|
||||
from typing import List
|
||||
|
@ -109,3 +110,33 @@ def deep_get(adict: dict, path: List[str], default=None):
|
|||
path,
|
||||
adict
|
||||
)
|
||||
|
||||
|
||||
class LRUCache(OrderedDict):
|
||||
"""Dict with a limited size, ejecting LRUs as needed.
|
||||
Default max size = 10Mb"""
|
||||
|
||||
def __init__(self, *args, cache_max_bytes: int = 1024 * 1024 * 10, **kwargs):
|
||||
assert cache_max_bytes > 0
|
||||
self.total_value_size = 0
|
||||
self.cache_max_bytes = cache_max_bytes
|
||||
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def __setitem__(self, key: str, value):
|
||||
if key in self:
|
||||
self.total_value_size -= sys.getsizeof(super().__getitem__(key).tobytes())
|
||||
self.total_value_size += sys.getsizeof(value.tobytes())
|
||||
super().__setitem__(key, value)
|
||||
super().move_to_end(key)
|
||||
|
||||
while self.total_value_size > self.cache_max_bytes:
|
||||
old_key, value = next(iter(self.items()))
|
||||
sz = sys.getsizeof(value.tobytes())
|
||||
super().__delitem__(old_key)
|
||||
self.total_value_size -= sz
|
||||
|
||||
def __getitem__(self, key: str):
|
||||
val = super().__getitem__(key)
|
||||
super().move_to_end(key)
|
||||
return val
|
||||
|
|
|
@ -1,26 +1,22 @@
|
|||
import click
|
||||
import os
|
||||
import re
|
||||
import socket
|
||||
import subprocess
|
||||
import tempfile
|
||||
import unicodedata
|
||||
import warnings
|
||||
|
||||
from bs4 import BeautifulSoup
|
||||
from typing import Any, Dict, List
|
||||
|
||||
import click
|
||||
|
||||
from toot.exceptions import ConsoleError
|
||||
from typing import Any, Dict, Generator, List, Optional
|
||||
from urllib.parse import urlparse, urlencode, quote, unquote
|
||||
|
||||
|
||||
def str_bool(b):
|
||||
def str_bool(b: bool) -> str:
|
||||
"""Convert boolean to string, in the way expected by the API."""
|
||||
return "true" if b else "false"
|
||||
|
||||
|
||||
def str_bool_nullable(b):
|
||||
def str_bool_nullable(b: Optional[bool]) -> Optional[str]:
|
||||
"""Similar to str_bool, but leave None as None"""
|
||||
return None if b is None else str_bool(b)
|
||||
|
||||
|
@ -34,7 +30,7 @@ def parse_html(html: str) -> BeautifulSoup:
|
|||
return BeautifulSoup(html.replace("'", "'"), "html.parser")
|
||||
|
||||
|
||||
def get_text(html):
|
||||
def get_text(html: str) -> str:
|
||||
"""Converts html to text, strips all tags."""
|
||||
text = parse_html(html).get_text()
|
||||
return unicodedata.normalize("NFKC", text)
|
||||
|
@ -53,7 +49,7 @@ def html_to_paragraphs(html: str) -> List[List[str]]:
|
|||
return [[get_text(line) for line in p] for p in paragraphs]
|
||||
|
||||
|
||||
def format_content(content):
|
||||
def format_content(content: str) -> Generator[str, None, None]:
|
||||
"""Given a Status contents in HTML, converts it into lines of plain text.
|
||||
|
||||
Returns a generator yielding lines of content.
|
||||
|
@ -73,25 +69,12 @@ def format_content(content):
|
|||
first = False
|
||||
|
||||
|
||||
def domain_exists(name):
|
||||
try:
|
||||
socket.gethostbyname(name)
|
||||
return True
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
|
||||
def assert_domain_exists(domain):
|
||||
if not domain_exists(domain):
|
||||
raise ConsoleError("Domain {} not found".format(domain))
|
||||
|
||||
|
||||
EOF_KEY = "Ctrl-Z" if os.name == 'nt' else "Ctrl-D"
|
||||
|
||||
|
||||
def multiline_input():
|
||||
def multiline_input() -> str:
|
||||
"""Lets user input multiple lines of text, terminated by EOF."""
|
||||
lines = []
|
||||
lines: List[str] = []
|
||||
while True:
|
||||
try:
|
||||
lines.append(input())
|
||||
|
|
|
@ -4,7 +4,7 @@ import os
|
|||
from datetime import datetime, timezone
|
||||
|
||||
|
||||
def parse_datetime(value):
|
||||
def parse_datetime(value: str) -> datetime:
|
||||
"""Returns an aware datetime in local timezone"""
|
||||
dttm = datetime.strptime(value, "%Y-%m-%dT%H:%M:%S.%f%z")
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user