mirror of
https://github.com/ihabunek/toot.git
synced 2024-09-29 04:35:54 -04:00
Merge pull request #341
This commit is contained in:
commit
2166918da2
2
.flake8
2
.flake8
@ -1,4 +1,4 @@
|
||||
[flake8]
|
||||
exclude=build,tests,tmp,venv,toot/tui/scroll.py
|
||||
ignore=E128
|
||||
ignore=E128,W503
|
||||
max-line-length=120
|
||||
|
@ -2,3 +2,4 @@ requests>=2.13,<3.0
|
||||
beautifulsoup4>=4.5.0,<5.0
|
||||
wcwidth>=0.1.7
|
||||
urwid>=2.0.0,<3.0
|
||||
|
||||
|
0
tests/integration/__init__.py
Normal file
0
tests/integration/__init__.py
Normal file
122
tests/integration/conftest.py
Normal file
122
tests/integration/conftest.py
Normal file
@ -0,0 +1,122 @@
|
||||
"""
|
||||
This module contains integration tests meant to run against a test Mastodon instance.
|
||||
|
||||
You can set up a test instance locally by following this guide:
|
||||
https://docs.joinmastodon.org/dev/setup/
|
||||
|
||||
To enable integration tests, export the following environment variables to match
|
||||
your test server and database:
|
||||
|
||||
```
|
||||
export TOOT_TEST_HOSTNAME="localhost:3000"
|
||||
export TOOT_TEST_DATABASE_DSN="dbname=mastodon_development"
|
||||
```
|
||||
"""
|
||||
|
||||
import re
|
||||
import os
|
||||
import psycopg2
|
||||
import pytest
|
||||
import uuid
|
||||
|
||||
from pathlib import Path
|
||||
from toot import api, App, User
|
||||
from toot.console import run_command
|
||||
|
||||
# Host name of a test instance to run integration tests against
|
||||
# DO NOT USE PUBLIC INSTANCES!!!
|
||||
BASE_URL = os.getenv("TOOT_TEST_BASE_URL")
|
||||
|
||||
# Mastodon database name, used to confirm user registration without having to click the link
|
||||
DATABASE_DSN = os.getenv("TOOT_TEST_DATABASE_DSN")
|
||||
|
||||
# Toot logo used for testing image upload
|
||||
TRUMPET = str(Path(__file__).parent.parent.parent / "trumpet.png")
|
||||
|
||||
ASSETS_DIR = str(Path(__file__).parent.parent / "assets")
|
||||
|
||||
|
||||
if not BASE_URL or not DATABASE_DSN:
|
||||
pytest.skip("Skipping integration tests", allow_module_level=True)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Fixtures
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
|
||||
def create_app():
|
||||
instance = api.get_instance(BASE_URL)
|
||||
response = api.create_app(BASE_URL)
|
||||
return App(instance["uri"], BASE_URL, response["client_id"], response["client_secret"])
|
||||
|
||||
|
||||
def register_account(app: App):
|
||||
username = str(uuid.uuid4())[-10:]
|
||||
email = f"{username}@example.com"
|
||||
|
||||
response = api.register_account(app, username, email, "password", "en")
|
||||
confirm_user(email)
|
||||
return User(app.instance, username, response["access_token"])
|
||||
|
||||
|
||||
def confirm_user(email):
|
||||
conn = psycopg2.connect(DATABASE_DSN)
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("UPDATE users SET confirmed_at = now() WHERE email = %s;", (email,))
|
||||
conn.commit()
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def app():
|
||||
return create_app()
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def user(app):
|
||||
return register_account(app)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def friend(app):
|
||||
return register_account(app)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def run(app, user, capsys):
|
||||
def _run(command, *params, as_user=None):
|
||||
run_command(app, as_user or user, command, params or [])
|
||||
out, err = capsys.readouterr()
|
||||
assert err == ""
|
||||
return strip_ansi(out)
|
||||
return _run
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def run_anon(capsys):
|
||||
def _run(command, *params):
|
||||
run_command(None, None, command, params or [])
|
||||
out, err = capsys.readouterr()
|
||||
assert err == ""
|
||||
return strip_ansi(out)
|
||||
return _run
|
||||
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Utils
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
strip_ansi_pattern = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
|
||||
|
||||
|
||||
def strip_ansi(string):
|
||||
return strip_ansi_pattern.sub("", string).strip()
|
||||
|
||||
|
||||
def posted_status_id(out):
|
||||
pattern = re.compile(r"Toot posted: http://([^/]+)/([^/]+)/(.+)")
|
||||
match = re.search(pattern, out)
|
||||
assert match
|
||||
|
||||
_, _, status_id = match.groups()
|
||||
|
||||
return status_id
|
19
tests/integration/test_accounts.py
Normal file
19
tests/integration/test_accounts.py
Normal file
@ -0,0 +1,19 @@
|
||||
|
||||
|
||||
def test_whoami(user, run):
|
||||
out = run("whoami")
|
||||
# TODO: test other fields once updating account is supported
|
||||
assert f"@{user.username}" in out
|
||||
|
||||
|
||||
def test_whois(app, friend, run):
|
||||
variants = [
|
||||
friend.username,
|
||||
f"@{friend.username}",
|
||||
f"{friend.username}@{app.instance}",
|
||||
f"@{friend.username}@{app.instance}",
|
||||
]
|
||||
|
||||
for username in variants:
|
||||
out = run("whois", username)
|
||||
assert f"@{friend.username}" in out
|
125
tests/integration/test_auth.py
Normal file
125
tests/integration/test_auth.py
Normal file
@ -0,0 +1,125 @@
|
||||
import pytest
|
||||
|
||||
from tests.integration.conftest import TRUMPET
|
||||
from toot import api
|
||||
from toot.exceptions import ConsoleError
|
||||
from toot.utils import get_text
|
||||
|
||||
|
||||
def test_update_account_no_options(run):
|
||||
with pytest.raises(ConsoleError) as exc:
|
||||
run("update_account")
|
||||
assert str(exc.value) == "Please specify at least one option to update the account"
|
||||
|
||||
|
||||
def test_update_account_display_name(run, app, user):
|
||||
out = run("update_account", "--display-name", "elwood")
|
||||
assert out == "✓ Account updated"
|
||||
|
||||
account = api.verify_credentials(app, user)
|
||||
assert account["display_name"] == "elwood"
|
||||
|
||||
|
||||
def test_update_account_note(run, app, user):
|
||||
note = ("It's 106 miles to Chicago, we got a full tank of gas, half a pack "
|
||||
"of cigarettes, it's dark... and we're wearing sunglasses.")
|
||||
|
||||
out = run("update_account", "--note", note)
|
||||
assert out == "✓ Account updated"
|
||||
|
||||
account = api.verify_credentials(app, user)
|
||||
assert get_text(account["note"]) == note
|
||||
|
||||
|
||||
def test_update_account_language(run, app, user):
|
||||
out = run("update_account", "--language", "hr")
|
||||
assert out == "✓ Account updated"
|
||||
|
||||
account = api.verify_credentials(app, user)
|
||||
assert account["source"]["language"] == "hr"
|
||||
|
||||
|
||||
def test_update_account_privacy(run, app, user):
|
||||
out = run("update_account", "--privacy", "private")
|
||||
assert out == "✓ Account updated"
|
||||
|
||||
account = api.verify_credentials(app, user)
|
||||
assert account["source"]["privacy"] == "private"
|
||||
|
||||
|
||||
def test_update_account_avatar(run, app, user):
|
||||
account = api.verify_credentials(app, user)
|
||||
old_value = account["avatar"]
|
||||
|
||||
out = run("update_account", "--avatar", TRUMPET)
|
||||
assert out == "✓ Account updated"
|
||||
|
||||
account = api.verify_credentials(app, user)
|
||||
assert account["avatar"] != old_value
|
||||
|
||||
|
||||
def test_update_account_header(run, app, user):
|
||||
account = api.verify_credentials(app, user)
|
||||
old_value = account["header"]
|
||||
|
||||
out = run("update_account", "--header", TRUMPET)
|
||||
assert out == "✓ Account updated"
|
||||
|
||||
account = api.verify_credentials(app, user)
|
||||
assert account["header"] != old_value
|
||||
|
||||
|
||||
def test_update_account_locked(run, app, user):
|
||||
out = run("update_account", "--locked")
|
||||
assert out == "✓ Account updated"
|
||||
|
||||
account = api.verify_credentials(app, user)
|
||||
assert account["locked"] is True
|
||||
|
||||
out = run("update_account", "--no-locked")
|
||||
assert out == "✓ Account updated"
|
||||
|
||||
account = api.verify_credentials(app, user)
|
||||
assert account["locked"] is False
|
||||
|
||||
|
||||
def test_update_account_bot(run, app, user):
|
||||
out = run("update_account", "--bot")
|
||||
assert out == "✓ Account updated"
|
||||
|
||||
account = api.verify_credentials(app, user)
|
||||
assert account["bot"] is True
|
||||
|
||||
out = run("update_account", "--no-bot")
|
||||
assert out == "✓ Account updated"
|
||||
|
||||
account = api.verify_credentials(app, user)
|
||||
assert account["bot"] is False
|
||||
|
||||
|
||||
def test_update_account_discoverable(run, app, user):
|
||||
out = run("update_account", "--discoverable")
|
||||
assert out == "✓ Account updated"
|
||||
|
||||
account = api.verify_credentials(app, user)
|
||||
assert account["discoverable"] is True
|
||||
|
||||
out = run("update_account", "--no-discoverable")
|
||||
assert out == "✓ Account updated"
|
||||
|
||||
account = api.verify_credentials(app, user)
|
||||
assert account["discoverable"] is False
|
||||
|
||||
|
||||
def test_update_account_sensitive(run, app, user):
|
||||
out = run("update_account", "--sensitive")
|
||||
assert out == "✓ Account updated"
|
||||
|
||||
account = api.verify_credentials(app, user)
|
||||
assert account["source"]["sensitive"] is True
|
||||
|
||||
out = run("update_account", "--no-sensitive")
|
||||
assert out == "✓ Account updated"
|
||||
|
||||
account = api.verify_credentials(app, user)
|
||||
assert account["source"]["sensitive"] is False
|
58
tests/integration/test_lists.py
Normal file
58
tests/integration/test_lists.py
Normal file
@ -0,0 +1,58 @@
|
||||
|
||||
from toot import api
|
||||
from tests.integration.conftest import register_account
|
||||
|
||||
|
||||
def test_lists_empty(run):
|
||||
out = run("lists")
|
||||
assert out == "You have no lists defined."
|
||||
|
||||
|
||||
def test_list_create_delete(run):
|
||||
out = run("list_create", "banana")
|
||||
assert out == '✓ List "banana" created.'
|
||||
|
||||
out = run("lists")
|
||||
assert "banana" in out
|
||||
|
||||
out = run("list_create", "mango")
|
||||
assert out == '✓ List "mango" created.'
|
||||
|
||||
out = run("lists")
|
||||
assert "banana" in out
|
||||
assert "mango" in out
|
||||
|
||||
out = run("list_delete", "banana")
|
||||
assert out == '✓ List "banana" deleted.'
|
||||
|
||||
out = run("lists")
|
||||
assert "banana" not in out
|
||||
assert "mango" in out
|
||||
|
||||
out = run("list_delete", "mango")
|
||||
assert out == '✓ List "mango" deleted.'
|
||||
|
||||
out = run("lists")
|
||||
assert out == "You have no lists defined."
|
||||
|
||||
|
||||
def test_list_add_remove(run, app):
|
||||
acc = register_account(app)
|
||||
run("list_create", "foo")
|
||||
|
||||
out = run("list_add", "foo", acc.username)
|
||||
assert out == f"You must follow @{acc.username} before adding this account to a list."
|
||||
|
||||
run("follow", acc.username)
|
||||
|
||||
out = run("list_add", "foo", acc.username)
|
||||
assert out == f'✓ Added account "{acc.username}"'
|
||||
|
||||
out = run("list_accounts", "foo")
|
||||
assert acc.username in out
|
||||
|
||||
out = run("list_remove", "foo", acc.username)
|
||||
assert out == f'✓ Removed account "{acc.username}"'
|
||||
|
||||
out = run("list_accounts", "foo")
|
||||
assert out == "This list has no accounts."
|
288
tests/integration/test_post.py
Normal file
288
tests/integration/test_post.py
Normal file
@ -0,0 +1,288 @@
|
||||
import re
|
||||
import uuid
|
||||
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from os import path
|
||||
from tests.integration.conftest import ASSETS_DIR, posted_status_id
|
||||
from toot import CLIENT_NAME, CLIENT_WEBSITE, api
|
||||
from toot.utils import get_text
|
||||
from unittest import mock
|
||||
|
||||
|
||||
def test_post(app, user, run):
|
||||
text = "i wish i was a #lumberjack"
|
||||
out = run("post", text)
|
||||
status_id = posted_status_id(out)
|
||||
|
||||
status = api.fetch_status(app, user, status_id)
|
||||
assert text == get_text(status["content"])
|
||||
assert status["visibility"] == "public"
|
||||
assert status["sensitive"] is False
|
||||
assert status["spoiler_text"] == ""
|
||||
assert status["poll"] is None
|
||||
|
||||
# Pleroma doesn't return the application
|
||||
if status["application"]:
|
||||
assert status["application"]["name"] == CLIENT_NAME
|
||||
assert status["application"]["website"] == CLIENT_WEBSITE
|
||||
|
||||
|
||||
def test_post_visibility(app, user, run):
|
||||
for visibility in ["public", "unlisted", "private", "direct"]:
|
||||
out = run("post", "foo", "--visibility", visibility)
|
||||
status_id = posted_status_id(out)
|
||||
status = api.fetch_status(app, user, status_id)
|
||||
assert status["visibility"] == visibility
|
||||
|
||||
|
||||
def test_post_scheduled_at(app, user, run):
|
||||
text = str(uuid.uuid4())
|
||||
scheduled_at = datetime.now(timezone.utc).replace(microsecond=0) + timedelta(minutes=10)
|
||||
|
||||
out = run("post", text, "--scheduled-at", scheduled_at.isoformat())
|
||||
assert "Toot scheduled for" in out
|
||||
|
||||
statuses = api.scheduled_statuses(app, user)
|
||||
[status] = [s for s in statuses if s["params"]["text"] == text]
|
||||
assert datetime.strptime(status["scheduled_at"], "%Y-%m-%dT%H:%M:%S.%f%z") == scheduled_at
|
||||
|
||||
|
||||
def test_post_scheduled_in(app, user, run):
|
||||
text = str(uuid.uuid4())
|
||||
|
||||
variants = [
|
||||
("1 day", timedelta(days=1)),
|
||||
("1 day 6 hours", timedelta(days=1, hours=6)),
|
||||
("1 day 6 hours 13 minutes", timedelta(days=1, hours=6, minutes=13)),
|
||||
("1 day 6 hours 13 minutes 51 second", timedelta(days=1, hours=6, minutes=13, seconds=51)),
|
||||
("2d", timedelta(days=2)),
|
||||
("2d6h", timedelta(days=2, hours=6)),
|
||||
("2d6h13m", timedelta(days=2, hours=6, minutes=13)),
|
||||
("2d6h13m51s", timedelta(days=2, hours=6, minutes=13, seconds=51)),
|
||||
]
|
||||
|
||||
datetimes = []
|
||||
for scheduled_in, delta in variants:
|
||||
out = run("post", text, "--scheduled-in", scheduled_in)
|
||||
dttm = datetime.utcnow() + delta
|
||||
assert out.startswith(f"Toot scheduled for: {str(dttm)[:16]}")
|
||||
datetimes.append(dttm)
|
||||
|
||||
scheduled = api.scheduled_statuses(app, user)
|
||||
scheduled = [s for s in scheduled if s["params"]["text"] == text]
|
||||
scheduled = sorted(scheduled, key=lambda s: s["scheduled_at"])
|
||||
assert len(scheduled) == 8
|
||||
|
||||
for expected, status in zip(datetimes, scheduled):
|
||||
actual = datetime.strptime(status["scheduled_at"], "%Y-%m-%dT%H:%M:%S.%fZ")
|
||||
delta = expected - actual
|
||||
assert delta.total_seconds() < 5
|
||||
|
||||
|
||||
def test_post_poll(app, user, run):
|
||||
text = str(uuid.uuid4())
|
||||
|
||||
out = run(
|
||||
"post", text,
|
||||
"--poll-option", "foo",
|
||||
"--poll-option", "bar",
|
||||
"--poll-option", "baz",
|
||||
"--poll-option", "qux",
|
||||
)
|
||||
|
||||
status_id = posted_status_id(out)
|
||||
|
||||
status = api.fetch_status(app, user, status_id)
|
||||
assert status["poll"]["expired"] is False
|
||||
assert status["poll"]["multiple"] is False
|
||||
assert status["poll"]["options"] == [
|
||||
{"title": "foo", "votes_count": 0},
|
||||
{"title": "bar", "votes_count": 0},
|
||||
{"title": "baz", "votes_count": 0},
|
||||
{"title": "qux", "votes_count": 0}
|
||||
]
|
||||
|
||||
# Test expires_at is 24h by default
|
||||
actual = datetime.strptime(status["poll"]["expires_at"], "%Y-%m-%dT%H:%M:%S.%f%z")
|
||||
expected = datetime.now(timezone.utc) + timedelta(days=1)
|
||||
delta = actual - expected
|
||||
assert delta.total_seconds() < 5
|
||||
|
||||
|
||||
def test_post_poll_multiple(app, user, run):
|
||||
text = str(uuid.uuid4())
|
||||
|
||||
out = run(
|
||||
"post", text,
|
||||
"--poll-option", "foo",
|
||||
"--poll-option", "bar",
|
||||
"--poll-multiple"
|
||||
)
|
||||
|
||||
status_id = posted_status_id(out)
|
||||
|
||||
status = api.fetch_status(app, user, status_id)
|
||||
assert status["poll"]["multiple"] is True
|
||||
|
||||
|
||||
def test_post_poll_expires_in(app, user, run):
|
||||
text = str(uuid.uuid4())
|
||||
|
||||
out = run(
|
||||
"post", text,
|
||||
"--poll-option", "foo",
|
||||
"--poll-option", "bar",
|
||||
"--poll-expires-in", "8h",
|
||||
)
|
||||
|
||||
status_id = posted_status_id(out)
|
||||
|
||||
status = api.fetch_status(app, user, status_id)
|
||||
actual = datetime.strptime(status["poll"]["expires_at"], "%Y-%m-%dT%H:%M:%S.%f%z")
|
||||
expected = datetime.now(timezone.utc) + timedelta(hours=8)
|
||||
delta = actual - expected
|
||||
assert delta.total_seconds() < 5
|
||||
|
||||
|
||||
def test_post_poll_hide_totals(app, user, run):
|
||||
text = str(uuid.uuid4())
|
||||
|
||||
out = run(
|
||||
"post", text,
|
||||
"--poll-option", "foo",
|
||||
"--poll-option", "bar",
|
||||
"--poll-hide-totals"
|
||||
)
|
||||
|
||||
status_id = posted_status_id(out)
|
||||
|
||||
status = api.fetch_status(app, user, status_id)
|
||||
|
||||
# votes_count is None when totals are hidden
|
||||
assert status["poll"]["options"] == [
|
||||
{"title": "foo", "votes_count": None},
|
||||
{"title": "bar", "votes_count": None},
|
||||
]
|
||||
|
||||
|
||||
def test_post_language(app, user, run):
|
||||
out = run("post", "test", "--language", "hr")
|
||||
status_id = posted_status_id(out)
|
||||
status = api.fetch_status(app, user, status_id)
|
||||
assert status["language"] == "hr"
|
||||
|
||||
out = run("post", "test", "--language", "zh")
|
||||
status_id = posted_status_id(out)
|
||||
status = api.fetch_status(app, user, status_id)
|
||||
assert status["language"] == "zh"
|
||||
|
||||
|
||||
def test_media_thumbnail(app, user, run):
|
||||
video_path = path.join(ASSETS_DIR, "small.webm")
|
||||
thumbnail_path = path.join(ASSETS_DIR, "test1.png")
|
||||
|
||||
out = run(
|
||||
"post",
|
||||
"--media", video_path,
|
||||
"--thumbnail", thumbnail_path,
|
||||
"--description", "foo",
|
||||
"some text"
|
||||
)
|
||||
|
||||
status_id = posted_status_id(out)
|
||||
status = api.fetch_status(app, user, status_id)
|
||||
[media] = status["media_attachments"]
|
||||
|
||||
assert media["description"] == "foo"
|
||||
assert media["type"] == "video"
|
||||
assert media["url"].endswith(".mp4")
|
||||
assert media["preview_url"].endswith(".png")
|
||||
|
||||
# Video properties
|
||||
assert int(media["meta"]["original"]["duration"]) == 5
|
||||
assert media["meta"]["original"]["height"] == 320
|
||||
assert media["meta"]["original"]["width"] == 560
|
||||
|
||||
# Thumbnail properties
|
||||
assert media["meta"]["small"]["height"] == 50
|
||||
assert media["meta"]["small"]["width"] == 50
|
||||
|
||||
|
||||
def test_media_attachments(app, user, run):
|
||||
path1 = path.join(ASSETS_DIR, "test1.png")
|
||||
path2 = path.join(ASSETS_DIR, "test2.png")
|
||||
path3 = path.join(ASSETS_DIR, "test3.png")
|
||||
path4 = path.join(ASSETS_DIR, "test4.png")
|
||||
|
||||
out = run(
|
||||
"post",
|
||||
"--media", path1,
|
||||
"--media", path2,
|
||||
"--media", path3,
|
||||
"--media", path4,
|
||||
"--description", "Test 1",
|
||||
"--description", "Test 2",
|
||||
"--description", "Test 3",
|
||||
"--description", "Test 4",
|
||||
"some text"
|
||||
)
|
||||
|
||||
status_id = posted_status_id(out)
|
||||
status = api.fetch_status(app, user, status_id)
|
||||
|
||||
[a1, a2, a3, a4] = status["media_attachments"]
|
||||
|
||||
# Pleroma doesn't send metadata
|
||||
if "meta" in a1:
|
||||
assert a1["meta"]["original"]["size"] == "50x50"
|
||||
assert a2["meta"]["original"]["size"] == "50x60"
|
||||
assert a3["meta"]["original"]["size"] == "50x70"
|
||||
assert a4["meta"]["original"]["size"] == "50x80"
|
||||
|
||||
assert a1["description"] == "Test 1"
|
||||
assert a2["description"] == "Test 2"
|
||||
assert a3["description"] == "Test 3"
|
||||
assert a4["description"] == "Test 4"
|
||||
|
||||
|
||||
@mock.patch("toot.utils.multiline_input")
|
||||
@mock.patch("sys.stdin.read")
|
||||
def test_media_attachment_without_text(mock_read, mock_ml, app, user, run):
|
||||
# No status from stdin or readline
|
||||
mock_read.return_value = ""
|
||||
mock_ml.return_value = ""
|
||||
|
||||
media_path = path.join(ASSETS_DIR, "test1.png")
|
||||
|
||||
out = run("post", "--media", media_path)
|
||||
status_id = posted_status_id(out)
|
||||
|
||||
status = api.fetch_status(app, user, status_id)
|
||||
assert status["content"] == ""
|
||||
|
||||
[attachment] = status["media_attachments"]
|
||||
assert not attachment["description"]
|
||||
|
||||
# Pleroma doesn't send metadata
|
||||
if "meta" in attachment:
|
||||
assert attachment["meta"]["original"]["size"] == "50x50"
|
||||
|
||||
|
||||
def test_reply_thread(app, user, friend, run):
|
||||
status = api.post_status(app, friend, "This is the status")
|
||||
|
||||
out = run("post", "--reply-to", status["id"], "This is the reply")
|
||||
status_id = posted_status_id(out)
|
||||
reply = api.fetch_status(app, user, status_id)
|
||||
|
||||
assert reply["in_reply_to_id"] == status["id"]
|
||||
|
||||
out = run("thread", status["id"])
|
||||
[s1, s2] = [s.strip() for s in re.split(r"─+", out) if s.strip()]
|
||||
|
||||
assert "This is the status" in s1
|
||||
assert "This is the reply" in s2
|
||||
assert friend.username in s1
|
||||
assert user.username in s2
|
||||
assert status["id"] in s1
|
||||
assert reply["id"] in s2
|
83
tests/integration/test_read.py
Normal file
83
tests/integration/test_read.py
Normal file
@ -0,0 +1,83 @@
|
||||
import pytest
|
||||
|
||||
from tests.integration.conftest import BASE_URL
|
||||
from toot import api
|
||||
from toot.exceptions import ConsoleError
|
||||
|
||||
|
||||
def test_instance(app, run):
|
||||
out = run("instance", "--disable-https")
|
||||
assert "Mastodon" in out
|
||||
assert app.instance in out
|
||||
assert "running Mastodon" in out
|
||||
|
||||
|
||||
def test_instance_anon(app, run_anon):
|
||||
out = run_anon("instance", BASE_URL)
|
||||
assert "Mastodon" in out
|
||||
assert app.instance in out
|
||||
assert "running Mastodon" in out
|
||||
|
||||
# Need to specify the instance name when running anon
|
||||
with pytest.raises(ConsoleError) as exc:
|
||||
run_anon("instance")
|
||||
assert str(exc.value) == "Please specify an instance."
|
||||
|
||||
|
||||
def test_whoami(user, run):
|
||||
out = run("whoami")
|
||||
# TODO: test other fields once updating account is supported
|
||||
assert f"@{user.username}" in out
|
||||
|
||||
|
||||
def test_whois(app, friend, run):
|
||||
variants = [
|
||||
friend.username,
|
||||
f"@{friend.username}",
|
||||
f"{friend.username}@{app.instance}",
|
||||
f"@{friend.username}@{app.instance}",
|
||||
]
|
||||
|
||||
for username in variants:
|
||||
out = run("whois", username)
|
||||
assert f"@{friend.username}" in out
|
||||
|
||||
|
||||
def test_search_account(friend, run):
|
||||
out = run("search", friend.username)
|
||||
assert out == f"Accounts:\n* @{friend.username}"
|
||||
|
||||
|
||||
def test_search_hashtag(app, user, run):
|
||||
api.post_status(app, user, "#hashtag_x")
|
||||
api.post_status(app, user, "#hashtag_y")
|
||||
api.post_status(app, user, "#hashtag_z")
|
||||
|
||||
out = run("search", "#hashtag")
|
||||
assert out == "Hashtags:\n#hashtag_x, #hashtag_y, #hashtag_z"
|
||||
|
||||
|
||||
def test_tags(run):
|
||||
out = run("tags_followed")
|
||||
assert out == "You're not following any hashtags."
|
||||
|
||||
out = run("tags_follow", "foo")
|
||||
assert out == "✓ You are now following #foo"
|
||||
|
||||
out = run("tags_followed")
|
||||
assert out == f"* #foo\t{BASE_URL}/tags/foo"
|
||||
|
||||
out = run("tags_follow", "bar")
|
||||
assert out == "✓ You are now following #bar"
|
||||
|
||||
out = run("tags_followed")
|
||||
assert out == "\n".join([
|
||||
f"* #bar\t{BASE_URL}/tags/bar",
|
||||
f"* #foo\t{BASE_URL}/tags/foo",
|
||||
])
|
||||
|
||||
out = run("tags_unfollow", "foo")
|
||||
assert out == "✓ You are no longer following #foo"
|
||||
|
||||
out = run("tags_followed")
|
||||
assert out == f"* #bar\t{BASE_URL}/tags/bar"
|
89
tests/integration/test_status.py
Normal file
89
tests/integration/test_status.py
Normal file
@ -0,0 +1,89 @@
|
||||
import time
|
||||
import pytest
|
||||
|
||||
from toot import api
|
||||
from toot.exceptions import NotFoundError
|
||||
|
||||
|
||||
def test_delete_status(app, user, run):
|
||||
status = api.post_status(app, user, "foo")
|
||||
|
||||
out = run("delete", status["id"])
|
||||
assert out == "✓ Status deleted"
|
||||
|
||||
with pytest.raises(NotFoundError):
|
||||
api.fetch_status(app, user, status["id"])
|
||||
|
||||
|
||||
def test_favourite(app, user, run):
|
||||
status = api.post_status(app, user, "foo")
|
||||
assert not status["favourited"]
|
||||
|
||||
out = run("favourite", status["id"])
|
||||
assert out == "✓ Status favourited"
|
||||
|
||||
status = api.fetch_status(app, user, status["id"])
|
||||
assert status["favourited"]
|
||||
|
||||
out = run("unfavourite", status["id"])
|
||||
assert out == "✓ Status unfavourited"
|
||||
|
||||
# A short delay is required before the server returns new data
|
||||
time.sleep(0.1)
|
||||
|
||||
status = api.fetch_status(app, user, status["id"])
|
||||
assert not status["favourited"]
|
||||
|
||||
|
||||
def test_reblog(app, user, run):
|
||||
status = api.post_status(app, user, "foo")
|
||||
assert not status["reblogged"]
|
||||
|
||||
out = run("reblog", status["id"])
|
||||
assert out == "✓ Status reblogged"
|
||||
|
||||
status = api.fetch_status(app, user, status["id"])
|
||||
assert status["reblogged"]
|
||||
|
||||
out = run("reblogged_by", status["id"])
|
||||
assert out == f"@{user.username}"
|
||||
|
||||
out = run("unreblog", status["id"])
|
||||
assert out == "✓ Status unreblogged"
|
||||
|
||||
status = api.fetch_status(app, user, status["id"])
|
||||
assert not status["reblogged"]
|
||||
|
||||
|
||||
def test_pin(app, user, run):
|
||||
status = api.post_status(app, user, "foo")
|
||||
assert not status["pinned"]
|
||||
|
||||
out = run("pin", status["id"])
|
||||
assert out == "✓ Status pinned"
|
||||
|
||||
status = api.fetch_status(app, user, status["id"])
|
||||
assert status["pinned"]
|
||||
|
||||
out = run("unpin", status["id"])
|
||||
assert out == "✓ Status unpinned"
|
||||
|
||||
status = api.fetch_status(app, user, status["id"])
|
||||
assert not status["pinned"]
|
||||
|
||||
|
||||
def test_bookmark(app, user, run):
|
||||
status = api.post_status(app, user, "foo")
|
||||
assert not status["bookmarked"]
|
||||
|
||||
out = run("bookmark", status["id"])
|
||||
assert out == "✓ Status bookmarked"
|
||||
|
||||
status = api.fetch_status(app, user, status["id"])
|
||||
assert status["bookmarked"]
|
||||
|
||||
out = run("unbookmark", status["id"])
|
||||
assert out == "✓ Status unbookmarked"
|
||||
|
||||
status = api.fetch_status(app, user, status["id"])
|
||||
assert not status["bookmarked"]
|
@ -1,758 +0,0 @@
|
||||
"""
|
||||
This module contains integration tests meant to run against a test Mastodon instance.
|
||||
|
||||
You can set up a test instance locally by following this guide:
|
||||
https://docs.joinmastodon.org/dev/setup/
|
||||
|
||||
To enable integration tests, export the following environment variables to match
|
||||
your test server and database:
|
||||
|
||||
```
|
||||
export TOOT_TEST_HOSTNAME="localhost:3000"
|
||||
export TOOT_TEST_DATABASE_DSN="dbname=mastodon_development"
|
||||
```
|
||||
"""
|
||||
|
||||
import os
|
||||
import psycopg2
|
||||
import pytest
|
||||
import re
|
||||
import time
|
||||
import uuid
|
||||
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from os import path
|
||||
from toot import CLIENT_NAME, CLIENT_WEBSITE, api, App, User
|
||||
from toot.console import run_command
|
||||
from toot.exceptions import ConsoleError, NotFoundError
|
||||
from toot.utils import get_text
|
||||
from unittest import mock
|
||||
|
||||
# Host name of a test instance to run integration tests against
|
||||
# DO NOT USE PUBLIC INSTANCES!!!
|
||||
BASE_URL = os.getenv("TOOT_TEST_BASE_URL")
|
||||
|
||||
# Mastodon database name, used to confirm user registration without having to click the link
|
||||
DATABASE_DSN = os.getenv("TOOT_TEST_DATABASE_DSN")
|
||||
|
||||
# Toot logo used for testing image upload
|
||||
TRUMPET = path.join(path.dirname(path.dirname(path.realpath(__file__))), "trumpet.png")
|
||||
|
||||
|
||||
if not BASE_URL or not DATABASE_DSN:
|
||||
pytest.skip("Skipping integration tests", allow_module_level=True)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Fixtures
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
|
||||
def create_app():
|
||||
instance = api.get_instance(BASE_URL)
|
||||
response = api.create_app(BASE_URL)
|
||||
return App(instance["uri"], BASE_URL, response["client_id"], response["client_secret"])
|
||||
|
||||
|
||||
def register_account(app: App):
|
||||
username = str(uuid.uuid4())[-10:]
|
||||
email = f"{username}@example.com"
|
||||
|
||||
response = api.register_account(app, username, email, "password", "en")
|
||||
confirm_user(email)
|
||||
return User(app.instance, username, response["access_token"])
|
||||
|
||||
|
||||
def confirm_user(email):
|
||||
conn = psycopg2.connect(DATABASE_DSN)
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("UPDATE users SET confirmed_at = now() WHERE email = %s;", (email,))
|
||||
conn.commit()
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def app():
|
||||
return create_app()
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def user(app):
|
||||
return register_account(app)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def friend(app):
|
||||
return register_account(app)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def run(app, user, capsys):
|
||||
def _run(command, *params, as_user=None):
|
||||
run_command(app, as_user or user, command, params or [])
|
||||
out, err = capsys.readouterr()
|
||||
assert err == ""
|
||||
return strip_ansi(out)
|
||||
return _run
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def run_anon(capsys):
|
||||
def _run(command, *params):
|
||||
run_command(None, None, command, params or [])
|
||||
out, err = capsys.readouterr()
|
||||
assert err == ""
|
||||
return strip_ansi(out)
|
||||
return _run
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Tests
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_instance(app, run):
|
||||
out = run("instance", "--disable-https")
|
||||
assert "Mastodon" in out
|
||||
assert app.instance in out
|
||||
assert "running Mastodon" in out
|
||||
|
||||
|
||||
def test_instance_anon(app, run_anon):
|
||||
out = run_anon("instance", BASE_URL)
|
||||
assert "Mastodon" in out
|
||||
assert app.instance in out
|
||||
assert "running Mastodon" in out
|
||||
|
||||
# Need to specify the instance name when running anon
|
||||
with pytest.raises(ConsoleError) as exc:
|
||||
run_anon("instance")
|
||||
assert str(exc.value) == "Please specify an instance."
|
||||
|
||||
|
||||
def test_post(app, user, run):
|
||||
text = "i wish i was a #lumberjack"
|
||||
out = run("post", text)
|
||||
status_id = _posted_status_id(out)
|
||||
|
||||
status = api.fetch_status(app, user, status_id)
|
||||
assert text == get_text(status["content"])
|
||||
assert status["visibility"] == "public"
|
||||
assert status["sensitive"] is False
|
||||
assert status["spoiler_text"] == ""
|
||||
assert status["poll"] is None
|
||||
|
||||
# Pleroma doesn't return the application
|
||||
if status["application"]:
|
||||
assert status["application"]["name"] == CLIENT_NAME
|
||||
assert status["application"]["website"] == CLIENT_WEBSITE
|
||||
|
||||
|
||||
def test_post_visibility(app, user, run):
|
||||
for visibility in ["public", "unlisted", "private", "direct"]:
|
||||
out = run("post", "foo", "--visibility", visibility)
|
||||
status_id = _posted_status_id(out)
|
||||
status = api.fetch_status(app, user, status_id)
|
||||
assert status["visibility"] == visibility
|
||||
|
||||
|
||||
def test_post_scheduled_at(app, user, run):
|
||||
text = str(uuid.uuid4())
|
||||
scheduled_at = datetime.now(timezone.utc).replace(microsecond=0) + timedelta(minutes=10)
|
||||
|
||||
out = run("post", text, "--scheduled-at", scheduled_at.isoformat())
|
||||
assert "Toot scheduled for" in out
|
||||
|
||||
statuses = api.scheduled_statuses(app, user)
|
||||
[status] = [s for s in statuses if s["params"]["text"] == text]
|
||||
assert datetime.strptime(status["scheduled_at"], "%Y-%m-%dT%H:%M:%S.%f%z") == scheduled_at
|
||||
|
||||
|
||||
def test_post_scheduled_in(app, user, run):
|
||||
text = str(uuid.uuid4())
|
||||
|
||||
variants = [
|
||||
("1 day", timedelta(days=1)),
|
||||
("1 day 6 hours", timedelta(days=1, hours=6)),
|
||||
("1 day 6 hours 13 minutes", timedelta(days=1, hours=6, minutes=13)),
|
||||
("1 day 6 hours 13 minutes 51 second", timedelta(days=1, hours=6, minutes=13, seconds=51)),
|
||||
("2d", timedelta(days=2)),
|
||||
("2d6h", timedelta(days=2, hours=6)),
|
||||
("2d6h13m", timedelta(days=2, hours=6, minutes=13)),
|
||||
("2d6h13m51s", timedelta(days=2, hours=6, minutes=13, seconds=51)),
|
||||
]
|
||||
|
||||
datetimes = []
|
||||
for scheduled_in, delta in variants:
|
||||
out = run("post", text, "--scheduled-in", scheduled_in)
|
||||
dttm = datetime.utcnow() + delta
|
||||
assert out.startswith(f"Toot scheduled for: {str(dttm)[:16]}")
|
||||
datetimes.append(dttm)
|
||||
|
||||
scheduled = api.scheduled_statuses(app, user)
|
||||
scheduled = [s for s in scheduled if s["params"]["text"] == text]
|
||||
scheduled = sorted(scheduled, key=lambda s: s["scheduled_at"])
|
||||
assert len(scheduled) == 8
|
||||
|
||||
for expected, status in zip(datetimes, scheduled):
|
||||
actual = datetime.strptime(status["scheduled_at"], "%Y-%m-%dT%H:%M:%S.%fZ")
|
||||
delta = expected - actual
|
||||
assert delta.total_seconds() < 5
|
||||
|
||||
|
||||
def test_post_poll(app, user, run):
|
||||
text = str(uuid.uuid4())
|
||||
|
||||
out = run(
|
||||
"post", text,
|
||||
"--poll-option", "foo",
|
||||
"--poll-option", "bar",
|
||||
"--poll-option", "baz",
|
||||
"--poll-option", "qux",
|
||||
)
|
||||
|
||||
status_id = _posted_status_id(out)
|
||||
|
||||
status = api.fetch_status(app, user, status_id)
|
||||
assert status["poll"]["expired"] is False
|
||||
assert status["poll"]["multiple"] is False
|
||||
assert status["poll"]["options"] == [
|
||||
{"title": "foo", "votes_count": 0},
|
||||
{"title": "bar", "votes_count": 0},
|
||||
{"title": "baz", "votes_count": 0},
|
||||
{"title": "qux", "votes_count": 0}
|
||||
]
|
||||
|
||||
# Test expires_at is 24h by default
|
||||
actual = datetime.strptime(status["poll"]["expires_at"], "%Y-%m-%dT%H:%M:%S.%f%z")
|
||||
expected = datetime.now(timezone.utc) + timedelta(days=1)
|
||||
delta = actual - expected
|
||||
assert delta.total_seconds() < 5
|
||||
|
||||
|
||||
def test_post_poll_multiple(app, user, run):
|
||||
text = str(uuid.uuid4())
|
||||
|
||||
out = run(
|
||||
"post", text,
|
||||
"--poll-option", "foo",
|
||||
"--poll-option", "bar",
|
||||
"--poll-multiple"
|
||||
)
|
||||
|
||||
status_id = _posted_status_id(out)
|
||||
|
||||
status = api.fetch_status(app, user, status_id)
|
||||
assert status["poll"]["multiple"] is True
|
||||
|
||||
|
||||
def test_post_poll_expires_in(app, user, run):
|
||||
text = str(uuid.uuid4())
|
||||
|
||||
out = run(
|
||||
"post", text,
|
||||
"--poll-option", "foo",
|
||||
"--poll-option", "bar",
|
||||
"--poll-expires-in", "8h",
|
||||
)
|
||||
|
||||
status_id = _posted_status_id(out)
|
||||
|
||||
status = api.fetch_status(app, user, status_id)
|
||||
actual = datetime.strptime(status["poll"]["expires_at"], "%Y-%m-%dT%H:%M:%S.%f%z")
|
||||
expected = datetime.now(timezone.utc) + timedelta(hours=8)
|
||||
delta = actual - expected
|
||||
assert delta.total_seconds() < 5
|
||||
|
||||
|
||||
def test_post_poll_hide_totals(app, user, run):
|
||||
text = str(uuid.uuid4())
|
||||
|
||||
out = run(
|
||||
"post", text,
|
||||
"--poll-option", "foo",
|
||||
"--poll-option", "bar",
|
||||
"--poll-hide-totals"
|
||||
)
|
||||
|
||||
status_id = _posted_status_id(out)
|
||||
|
||||
status = api.fetch_status(app, user, status_id)
|
||||
|
||||
# votes_count is None when totals are hidden
|
||||
assert status["poll"]["options"] == [
|
||||
{"title": "foo", "votes_count": None},
|
||||
{"title": "bar", "votes_count": None},
|
||||
]
|
||||
|
||||
|
||||
def test_post_language(app, user, run):
|
||||
out = run("post", "test", "--language", "hr")
|
||||
status_id = _posted_status_id(out)
|
||||
status = api.fetch_status(app, user, status_id)
|
||||
assert status["language"] == "hr"
|
||||
|
||||
out = run("post", "test", "--language", "zh")
|
||||
status_id = _posted_status_id(out)
|
||||
status = api.fetch_status(app, user, status_id)
|
||||
assert status["language"] == "zh"
|
||||
|
||||
|
||||
def test_media_thumbnail(app, user, run):
|
||||
assets_dir = path.realpath(path.join(path.dirname(__file__), "assets"))
|
||||
|
||||
video_path = path.join(assets_dir, "small.webm")
|
||||
thumbnail_path = path.join(assets_dir, "test1.png")
|
||||
|
||||
out = run(
|
||||
"post",
|
||||
"--media", video_path,
|
||||
"--thumbnail", thumbnail_path,
|
||||
"--description", "foo",
|
||||
"some text"
|
||||
)
|
||||
|
||||
status_id = _posted_status_id(out)
|
||||
status = api.fetch_status(app, user, status_id)
|
||||
[media] = status["media_attachments"]
|
||||
|
||||
assert media["description"] == "foo"
|
||||
assert media["type"] == "video"
|
||||
assert media["url"].endswith(".mp4")
|
||||
assert media["preview_url"].endswith(".png")
|
||||
|
||||
# Video properties
|
||||
assert int(media["meta"]["original"]["duration"]) == 5
|
||||
assert media["meta"]["original"]["height"] == 320
|
||||
assert media["meta"]["original"]["width"] == 560
|
||||
|
||||
# Thumbnail properties
|
||||
assert media["meta"]["small"]["height"] == 50
|
||||
assert media["meta"]["small"]["width"] == 50
|
||||
|
||||
|
||||
def test_media_attachments(app, user, run):
|
||||
assets_dir = path.realpath(path.join(path.dirname(__file__), "assets"))
|
||||
|
||||
path1 = path.join(assets_dir, "test1.png")
|
||||
path2 = path.join(assets_dir, "test2.png")
|
||||
path3 = path.join(assets_dir, "test3.png")
|
||||
path4 = path.join(assets_dir, "test4.png")
|
||||
|
||||
out = run(
|
||||
"post",
|
||||
"--media", path1,
|
||||
"--media", path2,
|
||||
"--media", path3,
|
||||
"--media", path4,
|
||||
"--description", "Test 1",
|
||||
"--description", "Test 2",
|
||||
"--description", "Test 3",
|
||||
"--description", "Test 4",
|
||||
"some text"
|
||||
)
|
||||
|
||||
status_id = _posted_status_id(out)
|
||||
status = api.fetch_status(app, user, status_id)
|
||||
|
||||
[a1, a2, a3, a4] = status["media_attachments"]
|
||||
|
||||
# Pleroma doesn't send metadata
|
||||
if "meta" in a1:
|
||||
assert a1["meta"]["original"]["size"] == "50x50"
|
||||
assert a2["meta"]["original"]["size"] == "50x60"
|
||||
assert a3["meta"]["original"]["size"] == "50x70"
|
||||
assert a4["meta"]["original"]["size"] == "50x80"
|
||||
|
||||
assert a1["description"] == "Test 1"
|
||||
assert a2["description"] == "Test 2"
|
||||
assert a3["description"] == "Test 3"
|
||||
assert a4["description"] == "Test 4"
|
||||
|
||||
|
||||
@mock.patch("toot.utils.multiline_input")
|
||||
@mock.patch("sys.stdin.read")
|
||||
def test_media_attachment_without_text(mock_read, mock_ml, app, user, run):
|
||||
# No status from stdin or readline
|
||||
mock_read.return_value = ""
|
||||
mock_ml.return_value = ""
|
||||
|
||||
assets_dir = path.realpath(path.join(path.dirname(__file__), "assets"))
|
||||
media_path = path.join(assets_dir, "test1.png")
|
||||
|
||||
out = run("post", "--media", media_path)
|
||||
status_id = _posted_status_id(out)
|
||||
|
||||
status = api.fetch_status(app, user, status_id)
|
||||
assert status["content"] == ""
|
||||
|
||||
[attachment] = status["media_attachments"]
|
||||
assert not attachment["description"]
|
||||
|
||||
# Pleroma doesn't send metadata
|
||||
if "meta" in attachment:
|
||||
assert attachment["meta"]["original"]["size"] == "50x50"
|
||||
|
||||
|
||||
def test_delete_status(app, user, run):
|
||||
status = api.post_status(app, user, "foo")
|
||||
|
||||
out = run("delete", status["id"])
|
||||
assert out == "✓ Status deleted"
|
||||
|
||||
with pytest.raises(NotFoundError):
|
||||
api.fetch_status(app, user, status["id"])
|
||||
|
||||
|
||||
def test_reply_thread(app, user, friend, run):
|
||||
status = api.post_status(app, friend, "This is the status")
|
||||
|
||||
out = run("post", "--reply-to", status["id"], "This is the reply")
|
||||
status_id = _posted_status_id(out)
|
||||
reply = api.fetch_status(app, user, status_id)
|
||||
|
||||
assert reply["in_reply_to_id"] == status["id"]
|
||||
|
||||
out = run("thread", status["id"])
|
||||
[s1, s2] = [s.strip() for s in re.split(r"─+", out) if s.strip()]
|
||||
|
||||
assert "This is the status" in s1
|
||||
assert "This is the reply" in s2
|
||||
assert friend.username in s1
|
||||
assert user.username in s2
|
||||
assert status["id"] in s1
|
||||
assert reply["id"] in s2
|
||||
|
||||
|
||||
def test_favourite(app, user, run):
|
||||
status = api.post_status(app, user, "foo")
|
||||
assert not status["favourited"]
|
||||
|
||||
out = run("favourite", status["id"])
|
||||
assert out == "✓ Status favourited"
|
||||
|
||||
status = api.fetch_status(app, user, status["id"])
|
||||
assert status["favourited"]
|
||||
|
||||
out = run("unfavourite", status["id"])
|
||||
assert out == "✓ Status unfavourited"
|
||||
|
||||
# A short delay is required before the server returns new data
|
||||
time.sleep(0.1)
|
||||
|
||||
status = api.fetch_status(app, user, status["id"])
|
||||
assert not status["favourited"]
|
||||
|
||||
|
||||
def test_reblog(app, user, run):
|
||||
status = api.post_status(app, user, "foo")
|
||||
assert not status["reblogged"]
|
||||
|
||||
out = run("reblog", status["id"])
|
||||
assert out == "✓ Status reblogged"
|
||||
|
||||
status = api.fetch_status(app, user, status["id"])
|
||||
assert status["reblogged"]
|
||||
|
||||
out = run("reblogged_by", status["id"])
|
||||
assert out == f"@{user.username}"
|
||||
|
||||
out = run("unreblog", status["id"])
|
||||
assert out == "✓ Status unreblogged"
|
||||
|
||||
status = api.fetch_status(app, user, status["id"])
|
||||
assert not status["reblogged"]
|
||||
|
||||
|
||||
def test_pin(app, user, run):
|
||||
status = api.post_status(app, user, "foo")
|
||||
assert not status["pinned"]
|
||||
|
||||
out = run("pin", status["id"])
|
||||
assert out == "✓ Status pinned"
|
||||
|
||||
status = api.fetch_status(app, user, status["id"])
|
||||
assert status["pinned"]
|
||||
|
||||
out = run("unpin", status["id"])
|
||||
assert out == "✓ Status unpinned"
|
||||
|
||||
status = api.fetch_status(app, user, status["id"])
|
||||
assert not status["pinned"]
|
||||
|
||||
|
||||
def test_bookmark(app, user, run):
|
||||
status = api.post_status(app, user, "foo")
|
||||
assert not status["bookmarked"]
|
||||
|
||||
out = run("bookmark", status["id"])
|
||||
assert out == "✓ Status bookmarked"
|
||||
|
||||
status = api.fetch_status(app, user, status["id"])
|
||||
assert status["bookmarked"]
|
||||
|
||||
out = run("unbookmark", status["id"])
|
||||
assert out == "✓ Status unbookmarked"
|
||||
|
||||
status = api.fetch_status(app, user, status["id"])
|
||||
assert not status["bookmarked"]
|
||||
|
||||
|
||||
def test_whoami(user, run):
|
||||
out = run("whoami")
|
||||
# TODO: test other fields once updating account is supported
|
||||
assert f"@{user.username}" in out
|
||||
|
||||
|
||||
def test_whois(app, friend, run):
|
||||
variants = [
|
||||
friend.username,
|
||||
f"@{friend.username}",
|
||||
f"{friend.username}@{app.instance}",
|
||||
f"@{friend.username}@{app.instance}",
|
||||
]
|
||||
|
||||
for username in variants:
|
||||
out = run("whois", username)
|
||||
assert f"@{friend.username}" in out
|
||||
|
||||
|
||||
def test_search_account(friend, run):
|
||||
out = run("search", friend.username)
|
||||
assert out == f"Accounts:\n* @{friend.username}"
|
||||
|
||||
|
||||
def test_search_hashtag(app, user, run):
|
||||
api.post_status(app, user, "#hashtag_x")
|
||||
api.post_status(app, user, "#hashtag_y")
|
||||
api.post_status(app, user, "#hashtag_z")
|
||||
|
||||
out = run("search", "#hashtag")
|
||||
assert out == "Hashtags:\n#hashtag_x, #hashtag_y, #hashtag_z"
|
||||
|
||||
|
||||
def test_follow(friend, run):
|
||||
out = run("follow", friend.username)
|
||||
assert out == f"✓ You are now following {friend.username}"
|
||||
|
||||
out = run("unfollow", friend.username)
|
||||
assert out == f"✓ You are no longer following {friend.username}"
|
||||
|
||||
|
||||
def test_follow_case_insensitive(friend, run):
|
||||
username = friend.username.upper()
|
||||
|
||||
out = run("follow", username)
|
||||
assert out == f"✓ You are now following {username}"
|
||||
|
||||
out = run("unfollow", username)
|
||||
assert out == f"✓ You are no longer following {username}"
|
||||
|
||||
|
||||
# TODO: improve testing stderr, catching exceptions is not optimal
|
||||
def test_follow_not_found(run):
|
||||
with pytest.raises(ConsoleError) as ex_info:
|
||||
run("follow", "banana")
|
||||
assert str(ex_info.value) == "Account not found"
|
||||
|
||||
|
||||
def test_mute(app, user, friend, run):
|
||||
out = run("mute", friend.username)
|
||||
assert out == f"✓ You have muted {friend.username}"
|
||||
|
||||
[muted_account] = api.get_muted_accounts(app, user)
|
||||
assert muted_account["acct"] == friend.username
|
||||
|
||||
out = run("unmute", friend.username)
|
||||
assert out == f"✓ {friend.username} is no longer muted"
|
||||
|
||||
assert api.get_muted_accounts(app, user) == []
|
||||
|
||||
|
||||
def test_block(app, user, friend, run):
|
||||
out = run("block", friend.username)
|
||||
assert out == f"✓ You are now blocking {friend.username}"
|
||||
|
||||
[blockd_account] = api.get_blocked_accounts(app, user)
|
||||
assert blockd_account["acct"] == friend.username
|
||||
|
||||
out = run("unblock", friend.username)
|
||||
assert out == f"✓ {friend.username} is no longer blocked"
|
||||
|
||||
assert api.get_blocked_accounts(app, user) == []
|
||||
|
||||
|
||||
def test_following_followers(user, friend, run):
|
||||
out = run("following", user.username)
|
||||
assert out == ""
|
||||
|
||||
run("follow", friend.username)
|
||||
|
||||
out = run("following", user.username)
|
||||
assert out == f"* @{friend.username}"
|
||||
|
||||
out = run("followers", friend.username)
|
||||
assert out == f"* @{user.username}"
|
||||
|
||||
|
||||
def test_tags(run):
|
||||
out = run("tags_followed")
|
||||
assert out == "You're not following any hashtags."
|
||||
|
||||
out = run("tags_follow", "foo")
|
||||
assert out == "✓ You are now following #foo"
|
||||
|
||||
out = run("tags_followed")
|
||||
assert out == f"* #foo\t{BASE_URL}/tags/foo"
|
||||
|
||||
out = run("tags_follow", "bar")
|
||||
assert out == "✓ You are now following #bar"
|
||||
|
||||
out = run("tags_followed")
|
||||
assert out == "\n".join([
|
||||
f"* #bar\t{BASE_URL}/tags/bar",
|
||||
f"* #foo\t{BASE_URL}/tags/foo",
|
||||
])
|
||||
|
||||
out = run("tags_unfollow", "foo")
|
||||
assert out == "✓ You are no longer following #foo"
|
||||
|
||||
out = run("tags_followed")
|
||||
assert out == f"* #bar\t{BASE_URL}/tags/bar"
|
||||
|
||||
|
||||
def test_update_account_no_options(run):
|
||||
with pytest.raises(ConsoleError) as exc:
|
||||
run("update_account")
|
||||
assert str(exc.value) == "Please specify at least one option to update the account"
|
||||
|
||||
|
||||
def test_update_account_display_name(run, app, user):
|
||||
out = run("update_account", "--display-name", "elwood")
|
||||
assert out == "✓ Account updated"
|
||||
|
||||
account = api.verify_credentials(app, user)
|
||||
assert account["display_name"] == "elwood"
|
||||
|
||||
|
||||
def test_update_account_note(run, app, user):
|
||||
note = ("It's 106 miles to Chicago, we got a full tank of gas, half a pack "
|
||||
"of cigarettes, it's dark... and we're wearing sunglasses.")
|
||||
|
||||
out = run("update_account", "--note", note)
|
||||
assert out == "✓ Account updated"
|
||||
|
||||
account = api.verify_credentials(app, user)
|
||||
assert get_text(account["note"]) == note
|
||||
|
||||
|
||||
def test_update_account_language(run, app, user):
|
||||
out = run("update_account", "--language", "hr")
|
||||
assert out == "✓ Account updated"
|
||||
|
||||
account = api.verify_credentials(app, user)
|
||||
assert account["source"]["language"] == "hr"
|
||||
|
||||
|
||||
def test_update_account_privacy(run, app, user):
|
||||
out = run("update_account", "--privacy", "private")
|
||||
assert out == "✓ Account updated"
|
||||
|
||||
account = api.verify_credentials(app, user)
|
||||
assert account["source"]["privacy"] == "private"
|
||||
|
||||
|
||||
def test_update_account_avatar(run, app, user):
|
||||
account = api.verify_credentials(app, user)
|
||||
old_value = account["avatar"]
|
||||
|
||||
out = run("update_account", "--avatar", TRUMPET)
|
||||
assert out == "✓ Account updated"
|
||||
|
||||
account = api.verify_credentials(app, user)
|
||||
assert account["avatar"] != old_value
|
||||
|
||||
|
||||
def test_update_account_header(run, app, user):
|
||||
account = api.verify_credentials(app, user)
|
||||
old_value = account["header"]
|
||||
|
||||
out = run("update_account", "--header", TRUMPET)
|
||||
assert out == "✓ Account updated"
|
||||
|
||||
account = api.verify_credentials(app, user)
|
||||
assert account["header"] != old_value
|
||||
|
||||
|
||||
def test_update_account_locked(run, app, user):
|
||||
out = run("update_account", "--locked")
|
||||
assert out == "✓ Account updated"
|
||||
|
||||
account = api.verify_credentials(app, user)
|
||||
assert account["locked"] is True
|
||||
|
||||
out = run("update_account", "--no-locked")
|
||||
assert out == "✓ Account updated"
|
||||
|
||||
account = api.verify_credentials(app, user)
|
||||
assert account["locked"] is False
|
||||
|
||||
|
||||
def test_update_account_bot(run, app, user):
|
||||
out = run("update_account", "--bot")
|
||||
assert out == "✓ Account updated"
|
||||
|
||||
account = api.verify_credentials(app, user)
|
||||
assert account["bot"] is True
|
||||
|
||||
out = run("update_account", "--no-bot")
|
||||
assert out == "✓ Account updated"
|
||||
|
||||
account = api.verify_credentials(app, user)
|
||||
assert account["bot"] is False
|
||||
|
||||
|
||||
def test_update_account_discoverable(run, app, user):
|
||||
out = run("update_account", "--discoverable")
|
||||
assert out == "✓ Account updated"
|
||||
|
||||
account = api.verify_credentials(app, user)
|
||||
assert account["discoverable"] is True
|
||||
|
||||
out = run("update_account", "--no-discoverable")
|
||||
assert out == "✓ Account updated"
|
||||
|
||||
account = api.verify_credentials(app, user)
|
||||
assert account["discoverable"] is False
|
||||
|
||||
|
||||
def test_update_account_sensitive(run, app, user):
|
||||
out = run("update_account", "--sensitive")
|
||||
assert out == "✓ Account updated"
|
||||
|
||||
account = api.verify_credentials(app, user)
|
||||
assert account["source"]["sensitive"] is True
|
||||
|
||||
out = run("update_account", "--no-sensitive")
|
||||
assert out == "✓ Account updated"
|
||||
|
||||
account = api.verify_credentials(app, user)
|
||||
assert account["source"]["sensitive"] is False
|
||||
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Utils
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
strip_ansi_pattern = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
|
||||
|
||||
|
||||
def strip_ansi(string):
|
||||
return strip_ansi_pattern.sub("", string).strip()
|
||||
|
||||
|
||||
def _posted_status_id(out):
|
||||
pattern = re.compile(r"Toot posted: http://([^/]+)/([^/]+)/(.+)")
|
||||
match = re.search(pattern, out)
|
||||
assert match
|
||||
|
||||
_, _, status_id = match.groups()
|
||||
|
||||
return status_id
|
42
toot/api.py
42
toot/api.py
@ -519,3 +519,45 @@ def clear_notifications(app, user):
|
||||
def get_instance(base_url):
|
||||
url = f"{base_url}/api/v1/instance"
|
||||
return http.anon_get(url).json()
|
||||
|
||||
|
||||
def get_lists(app, user):
|
||||
path = "/api/v1/lists"
|
||||
return _get_response_list(app, user, path)
|
||||
|
||||
|
||||
def find_list_id(app, user, title):
|
||||
lists = get_lists(app, user)
|
||||
for list_item in lists:
|
||||
if list_item["title"] == title:
|
||||
return list_item["id"]
|
||||
return None
|
||||
|
||||
|
||||
def get_list_accounts(app, user, list_id):
|
||||
path = f"/api/v1/lists/{list_id}/accounts"
|
||||
return _get_response_list(app, user, path)
|
||||
|
||||
|
||||
def create_list(app, user, title, replies_policy):
|
||||
url = "/api/v1/lists"
|
||||
json = {'title': title}
|
||||
if replies_policy:
|
||||
json['replies_policy'] = replies_policy
|
||||
return http.post(app, user, url, json=json).json()
|
||||
|
||||
|
||||
def delete_list(app, user, id):
|
||||
return http.delete(app, user, f"/api/v1/lists/{id}")
|
||||
|
||||
|
||||
def add_accounts_to_list(app, user, list_id, account_ids):
|
||||
url = f"/api/v1/lists/{list_id}/accounts"
|
||||
json = {'account_ids': account_ids}
|
||||
return http.post(app, user, url, json=json).json()
|
||||
|
||||
|
||||
def remove_accounts_from_list(app, user, list_id, account_ids):
|
||||
url = f"/api/v1/lists/{list_id}/accounts"
|
||||
json = {'account_ids': account_ids}
|
||||
return http.delete(app, user, url, json=json)
|
||||
|
@ -1,3 +1,4 @@
|
||||
|
||||
import sys
|
||||
import platform
|
||||
|
||||
@ -6,9 +7,9 @@ from time import sleep, time
|
||||
from toot import api, config, __version__
|
||||
from toot.auth import login_interactive, login_browser_interactive, create_app_interactive
|
||||
from toot.exceptions import ApiError, ConsoleError
|
||||
from toot.output import (print_out, print_instance, print_account, print_acct_list,
|
||||
print_search_results, print_timeline, print_notifications,
|
||||
print_tag_list)
|
||||
from toot.output import (print_lists, print_out, print_instance, print_account, print_acct_list,
|
||||
print_search_results, print_table, print_timeline, print_notifications,
|
||||
print_tag_list, print_list_accounts)
|
||||
from toot.tui.utils import parse_datetime
|
||||
from toot.utils import args_get_instance, delete_tmp_status_file, editor_input, multiline_input, EOF_KEY
|
||||
|
||||
@ -423,6 +424,82 @@ def tags_followed(app, user, args):
|
||||
print_tag_list(response)
|
||||
|
||||
|
||||
def lists(app, user, args):
|
||||
lists = api.get_lists(app, user)
|
||||
|
||||
if lists:
|
||||
print_lists(lists)
|
||||
else:
|
||||
print_out("You have no lists defined.")
|
||||
|
||||
|
||||
def list_accounts(app, user, args):
|
||||
list_id = args.id if args.id else api.find_list_id(app, user, args.title)
|
||||
if not list_id:
|
||||
print_out("<red>List not found</red>")
|
||||
return
|
||||
|
||||
response = api.get_list_accounts(app, user, list_id)
|
||||
print_list_accounts(response)
|
||||
|
||||
|
||||
def list_create(app, user, args):
|
||||
api.create_list(app, user, title=args.title, replies_policy=args.replies_policy)
|
||||
print_out(f"<green>✓ List \"{args.title}\" created.</green>")
|
||||
|
||||
|
||||
def list_delete(app, user, args):
|
||||
list_id = args.id if args.id else api.find_list_id(app, user, args.title)
|
||||
if not list_id:
|
||||
print_out("<red>List not found</red>")
|
||||
return
|
||||
|
||||
api.delete_list(app, user, list_id)
|
||||
print_out(f"<green>✓ List \"{args.title if args.title else args.id}\"</green> <red>deleted.</red>")
|
||||
|
||||
|
||||
def list_add(app, user, args):
|
||||
list_id = args.id if args.id else api.find_list_id(app, user, args.title)
|
||||
if not list_id:
|
||||
print_out("<red>List not found</red>")
|
||||
return
|
||||
account = find_account(app, user, args.account)
|
||||
if not account:
|
||||
print_out("<red>Account not found</red>")
|
||||
return
|
||||
try:
|
||||
api.add_accounts_to_list(app, user, list_id, [account['id']])
|
||||
except Exception as ex:
|
||||
# if we failed to add the account, try to give a
|
||||
# more specific error message than "record not found"
|
||||
my_accounts = api.followers(app, user, account['id'])
|
||||
found = False
|
||||
if my_accounts:
|
||||
for my_account in my_accounts:
|
||||
if my_account['id'] == account['id']:
|
||||
found = True
|
||||
break
|
||||
if found is False:
|
||||
print_out(f"<red>You must follow @{account['acct']} before adding this account to a list.</red>")
|
||||
else:
|
||||
print_out(f"<red>{ex}</red>")
|
||||
return
|
||||
print_out(f"<green>✓ Added account \"{args.account}\"</green>")
|
||||
|
||||
|
||||
def list_remove(app, user, args):
|
||||
list_id = args.id if args.id else api.find_list_id(app, user, args.title)
|
||||
if not list_id:
|
||||
print_out("<red>List not found</red>")
|
||||
return
|
||||
account = find_account(app, user, args.account)
|
||||
if not account:
|
||||
print_out("<red>Account not found</red>")
|
||||
return
|
||||
api.remove_accounts_from_list(app, user, list_id, [account['id']])
|
||||
print_out(f"<green>✓ Removed account \"{args.account}\"</green>")
|
||||
|
||||
|
||||
def mute(app, user, args):
|
||||
account = find_account(app, user, args.account)
|
||||
api.mute(app, user, account['id'])
|
||||
|
@ -724,6 +724,101 @@ TAG_COMMANDS = [
|
||||
),
|
||||
]
|
||||
|
||||
LIST_COMMANDS = [
|
||||
Command(
|
||||
name="lists",
|
||||
description="List all lists",
|
||||
arguments=[],
|
||||
require_auth=True,
|
||||
),
|
||||
Command(
|
||||
name="list_accounts",
|
||||
description="List the accounts in a list",
|
||||
arguments=[
|
||||
(["--id"], {
|
||||
"type": str,
|
||||
"help": "ID of the list"
|
||||
}),
|
||||
(["title"], {
|
||||
"type": str,
|
||||
"nargs": "?",
|
||||
"help": "title of the list"
|
||||
}),
|
||||
],
|
||||
require_auth=True,
|
||||
),
|
||||
Command(
|
||||
name="list_create",
|
||||
description="Create a list",
|
||||
arguments=[
|
||||
(["title"], {
|
||||
"type": str,
|
||||
"help": "title of the list"
|
||||
}),
|
||||
(["--replies-policy"], {
|
||||
"type": str,
|
||||
"help": "replies policy: 'followed', 'list', or 'none' (defaults to 'none')"
|
||||
}),
|
||||
],
|
||||
require_auth=True,
|
||||
),
|
||||
Command(
|
||||
name="list_delete",
|
||||
description="Delete a list",
|
||||
arguments=[
|
||||
(["--id"], {
|
||||
"type": str,
|
||||
"help": "ID of the list"
|
||||
}),
|
||||
(["title"], {
|
||||
"type": str,
|
||||
"nargs": "?",
|
||||
"help": "title of the list"
|
||||
}),
|
||||
],
|
||||
require_auth=True,
|
||||
),
|
||||
Command(
|
||||
name="list_add",
|
||||
description="Add account to list",
|
||||
arguments=[
|
||||
(["--id"], {
|
||||
"type": str,
|
||||
"help": "ID of the list"
|
||||
}),
|
||||
(["title"], {
|
||||
"type": str,
|
||||
"nargs": "?",
|
||||
"help": "title of the list"
|
||||
}),
|
||||
(["account"], {
|
||||
"type": str,
|
||||
"help": "Account to add"
|
||||
}),
|
||||
],
|
||||
require_auth=True,
|
||||
),
|
||||
Command(
|
||||
name="list_remove",
|
||||
description="Remove account from list",
|
||||
arguments=[
|
||||
(["--id"], {
|
||||
"type": str,
|
||||
"help": "ID of the list"
|
||||
}),
|
||||
(["title"], {
|
||||
"type": str,
|
||||
"nargs": "?",
|
||||
"help": "title of the list"
|
||||
}),
|
||||
(["account"], {
|
||||
"type": str,
|
||||
"help": "Account to remove"
|
||||
}),
|
||||
],
|
||||
require_auth=True,
|
||||
),
|
||||
]
|
||||
COMMAND_GROUPS = [
|
||||
("Authentication", AUTH_COMMANDS),
|
||||
("TUI", TUI_COMMANDS),
|
||||
@ -732,6 +827,7 @@ COMMAND_GROUPS = [
|
||||
("Status", STATUS_COMMANDS),
|
||||
("Accounts", ACCOUNTS_COMMANDS),
|
||||
("Hashtags", TAG_COMMANDS),
|
||||
("Lists", LIST_COMMANDS),
|
||||
]
|
||||
|
||||
COMMANDS = list(chain(*[commands for _, commands in COMMAND_GROUPS]))
|
||||
|
@ -92,13 +92,13 @@ def patch(app, user, path, headers=None, files=None, data=None, json=None):
|
||||
return process_response(response)
|
||||
|
||||
|
||||
def delete(app, user, path, data=None, headers=None):
|
||||
def delete(app, user, path, data=None, json=None, headers=None):
|
||||
url = app.base_url + path
|
||||
|
||||
headers = headers or {}
|
||||
headers["Authorization"] = f"Bearer {user.access_token}"
|
||||
|
||||
request = Request('DELETE', url, headers=headers, json=data)
|
||||
request = Request('DELETE', url, headers=headers, data=data, json=json)
|
||||
response = send_request(request)
|
||||
|
||||
return process_response(response)
|
||||
|
@ -3,9 +3,10 @@ import re
|
||||
import sys
|
||||
import textwrap
|
||||
|
||||
from toot.tui.utils import parse_datetime
|
||||
from typing import List
|
||||
from wcwidth import wcswidth
|
||||
|
||||
from toot.tui.utils import parse_datetime
|
||||
from toot.utils import get_text, parse_html
|
||||
from toot.wcstring import wc_wrap
|
||||
|
||||
@ -210,6 +211,43 @@ def print_tag_list(tags):
|
||||
print_out("You're not following any hashtags.")
|
||||
|
||||
|
||||
def print_lists(lists):
|
||||
headers = ["ID", "Title", "Replies"]
|
||||
data = [[lst["id"], lst["title"], lst["replies_policy"]] for lst in lists]
|
||||
print_table(headers, data)
|
||||
|
||||
|
||||
def print_table(headers: List[str], data: List[List[str]]):
|
||||
widths = [[len(cell) for cell in row] for row in data + [headers]]
|
||||
widths = [max(width) for width in zip(*widths)]
|
||||
|
||||
def style(string, tag):
|
||||
return f"<{tag}>{string}</{tag}>" if tag else string
|
||||
|
||||
def print_row(row, tag=None):
|
||||
for idx, cell in enumerate(row):
|
||||
width = widths[idx]
|
||||
print_out(style(cell.ljust(width), tag), end="")
|
||||
print_out(" ", end="")
|
||||
print_out()
|
||||
|
||||
underlines = ["-" * width for width in widths]
|
||||
|
||||
print_row(headers, "bold")
|
||||
print_row(underlines, "dim")
|
||||
|
||||
for row in data:
|
||||
print_row(row)
|
||||
|
||||
|
||||
def print_list_accounts(accounts):
|
||||
if accounts:
|
||||
print_out("Accounts in list</green>:\n")
|
||||
print_acct_list(accounts)
|
||||
else:
|
||||
print_out("This list has no accounts.")
|
||||
|
||||
|
||||
def print_search_results(results):
|
||||
accounts = results['accounts']
|
||||
hashtags = results['hashtags']
|
||||
|
@ -398,7 +398,9 @@ class TUI(urwid.Frame):
|
||||
|
||||
def show_goto_menu(self):
|
||||
user_timelines = self.config.get("timelines", {})
|
||||
menu = GotoMenu(user_timelines)
|
||||
user_lists = api.get_lists(self.app, self.user) or []
|
||||
|
||||
menu = GotoMenu(user_timelines, user_lists)
|
||||
urwid.connect_signal(menu, "home_timeline",
|
||||
lambda x: self.goto_home_timeline())
|
||||
urwid.connect_signal(menu, "public_timeline",
|
||||
@ -411,10 +413,12 @@ class TUI(urwid.Frame):
|
||||
lambda x, local: self.goto_conversations())
|
||||
urwid.connect_signal(menu, "hashtag_timeline",
|
||||
lambda x, tag, local: self.goto_tag_timeline(tag, local=local))
|
||||
urwid.connect_signal(menu, "list_timeline",
|
||||
lambda x, list_item: self.goto_list_timeline(list_item))
|
||||
|
||||
self.open_overlay(menu, title="Go to", options=dict(
|
||||
align="center", width=("relative", 60),
|
||||
valign="middle", height=16 + len(user_timelines),
|
||||
valign="middle", height=17 + len(user_timelines) + len(user_lists),
|
||||
))
|
||||
|
||||
def show_help(self):
|
||||
@ -468,6 +472,13 @@ class TUI(urwid.Frame):
|
||||
)
|
||||
promise.add_done_callback(lambda *args: self.close_overlay())
|
||||
|
||||
def goto_list_timeline(self, list_item):
|
||||
self.timeline_generator = api.timeline_list_generator(
|
||||
self.app, self.user, list_item['id'], limit=40)
|
||||
promise = self.async_load_timeline(
|
||||
is_initial=True, timeline_name=f"\N{clipboard}{list_item['title']}")
|
||||
promise.add_done_callback(lambda *args: self.close_overlay())
|
||||
|
||||
def show_media(self, status):
|
||||
urls = [m["url"] for m in status.original.data["media_attachments"]]
|
||||
if urls:
|
||||
@ -660,12 +671,19 @@ class TUI(urwid.Frame):
|
||||
|
||||
def refresh_timeline(self):
|
||||
# No point in refreshing the bookmarks timeline
|
||||
if not self.timeline or self.timeline.name == 'bookmarks':
|
||||
# and we don't have a good way to refresh a
|
||||
# list timeline yet (no reference to list ID kept)
|
||||
if (not self.timeline
|
||||
or self.timeline.name == 'bookmarks'
|
||||
or self.timeline.name.startswith("\N{clipboard}")):
|
||||
return
|
||||
|
||||
if self.timeline.name.startswith("#"):
|
||||
self.timeline_generator = api.tag_timeline_generator(
|
||||
self.app, self.user, self.timeline.name[1:], limit=40)
|
||||
elif self.timeline.name.startswith("\N{clipboard}"):
|
||||
self.timeline_generator = api.tag_timeline_generator(
|
||||
self.app, self.user, self.timeline.name[1:], limit=40)
|
||||
else:
|
||||
if self.timeline.name.endswith("public"):
|
||||
self.timeline_generator = api.public_timeline_generator(
|
||||
|
@ -102,20 +102,21 @@ class GotoMenu(urwid.ListBox):
|
||||
"bookmark_timeline",
|
||||
"notification_timeline",
|
||||
"conversation_timeline",
|
||||
"list_timeline",
|
||||
]
|
||||
|
||||
def __init__(self, user_timelines):
|
||||
def __init__(self, user_timelines, user_lists):
|
||||
self.hash_edit = EditBox(caption="Hashtag: ")
|
||||
self.message_widget = urwid.Text("")
|
||||
|
||||
actions = list(self.generate_actions(user_timelines))
|
||||
actions = list(self.generate_actions(user_timelines, user_lists))
|
||||
walker = urwid.SimpleFocusListWalker(actions)
|
||||
super().__init__(walker)
|
||||
|
||||
def get_hashtag(self):
|
||||
return self.hash_edit.edit_text.strip().lstrip("#")
|
||||
|
||||
def generate_actions(self, user_timelines):
|
||||
def generate_actions(self, user_timelines, user_lists):
|
||||
def _home(button):
|
||||
self._emit("home_timeline")
|
||||
|
||||
@ -147,6 +148,11 @@ class GotoMenu(urwid.ListBox):
|
||||
self._emit("hashtag_timeline", tag, local)
|
||||
return on_press
|
||||
|
||||
def mk_on_press_user_list(list_item):
|
||||
def on_press(btn):
|
||||
self._emit("list_timeline", list_item)
|
||||
return on_press
|
||||
|
||||
yield Button("Home timeline", on_press=_home)
|
||||
yield Button("Local public timeline", on_press=_local_public)
|
||||
yield Button("Global public timeline", on_press=_global_public)
|
||||
@ -164,6 +170,10 @@ class GotoMenu(urwid.ListBox):
|
||||
yield Button(f"#{tag}" + (" (local)" if is_local else ""),
|
||||
on_press=mk_on_press_user_hashtag(tag, is_local))
|
||||
|
||||
for list_item in user_lists:
|
||||
yield Button(f"\N{clipboard}{list_item['title']}",
|
||||
on_press=mk_on_press_user_list(list_item))
|
||||
|
||||
yield urwid.Divider()
|
||||
yield self.hash_edit
|
||||
yield Button("Local hashtag timeline", on_press=lambda x: _hashtag(True))
|
||||
|
@ -1,4 +1,5 @@
|
||||
import urwid
|
||||
from wcwidth import wcswidth
|
||||
|
||||
|
||||
class Clickable:
|
||||
@ -40,12 +41,12 @@ class Button(urwid.AttrWrap):
|
||||
"""Styled button."""
|
||||
def __init__(self, *args, **kwargs):
|
||||
button = urwid.Button(*args, **kwargs)
|
||||
padding = urwid.Padding(button, width=len(args[0]) + 4)
|
||||
padding = urwid.Padding(button, width=wcswidth(args[0]) + 4)
|
||||
return super().__init__(padding, "button", "button_focused")
|
||||
|
||||
def set_label(self, *args, **kwargs):
|
||||
self.original_widget.original_widget.set_label(*args, **kwargs)
|
||||
self.original_widget.width = len(args[0]) + 4
|
||||
self.original_widget.width = wcswidth(args[0]) + 4
|
||||
|
||||
|
||||
class CheckBox(urwid.AttrWrap):
|
||||
|
Loading…
Reference in New Issue
Block a user