From 2429d9f751dd4cf7001d2cd4b52a6970eb7d385e Mon Sep 17 00:00:00 2001 From: Ivan Habunek Date: Sun, 3 Dec 2023 07:07:18 +0100 Subject: [PATCH] Migrate timeline commands --- Makefile | 2 +- tests/integration/conftest.py | 12 +- tests/integration/test_timelines.py | 198 ++++++++++++++++++++++++++++ toot/api.py | 70 +++++++--- toot/cli/__init__.py | 6 +- toot/cli/base.py | 2 +- toot/cli/read.py | 1 + toot/cli/timelines.py | 180 +++++++++++++++++++++++++ 8 files changed, 448 insertions(+), 23 deletions(-) create mode 100644 tests/integration/test_timelines.py create mode 100644 toot/cli/timelines.py diff --git a/Makefile b/Makefile index 4b09396..e560506 100644 --- a/Makefile +++ b/Makefile @@ -15,7 +15,7 @@ test: coverage: coverage erase coverage run - coverage html --omit toot/tui/* + coverage html --omit "toot/tui/*" coverage report clean : diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index d6e08dc..528be6a 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -115,12 +115,20 @@ def runner(): @pytest.fixture def run(app, user, runner): - def _run(command, *params, as_user=None, input=None) -> Result: - ctx = Context(app, as_user or user) + def _run(command, *params, input=None) -> Result: + ctx = Context(app, user) return runner.invoke(command, params, obj=ctx, input=input) return _run +@pytest.fixture +def run_as(app, runner): + def _run_as(user, command, *params, input=None) -> Result: + ctx = Context(app, user) + return runner.invoke(command, params, obj=ctx, input=input) + return _run_as + + @pytest.fixture def run_json(app, user, runner): def _run_json(command, *params): diff --git a/tests/integration/test_timelines.py b/tests/integration/test_timelines.py new file mode 100644 index 0000000..5f4da1e --- /dev/null +++ b/tests/integration/test_timelines.py @@ -0,0 +1,198 @@ +import pytest + +from time import sleep +from uuid import uuid4 + +from toot import api, cli +from toot.entities import from_dict, Status +from tests.integration.conftest import TOOT_TEST_BASE_URL, register_account + + +# TODO: If fixture is not overriden here, tests fail, not sure why, figure it out +@pytest.fixture(scope="module") +def user(app): + return register_account(app) + + +@pytest.fixture(scope="module") +def other_user(app): + return register_account(app) + + +@pytest.fixture(scope="module") +def friend_user(app, user): + friend = register_account(app) + friend_account = api.find_account(app, user, friend.username) + api.follow(app, user, friend_account["id"]) + return friend + + +@pytest.fixture(scope="module") +def friend_list(app, user, friend_user): + friend_account = api.find_account(app, user, friend_user.username) + list = api.create_list(app, user, str(uuid4())) + api.add_accounts_to_list(app, user, list["id"], account_ids=[friend_account["id"]]) + return list + + +def test_timelines(app, user, other_user, friend_user, friend_list, run): + status1 = _post_status(app, user, "#foo") + status2 = _post_status(app, other_user, "#bar") + status3 = _post_status(app, friend_user, "#foo #bar") + + # Give mastodon time to process things :/ + # Tests fail if this is removed, required delay depends on server speed + sleep(1) + + # Home timeline + result = run(cli.timeline) + assert result.exit_code == 0 + assert status1.id in result.stdout + assert status2.id not in result.stdout + assert status3.id in result.stdout + + # Public timeline + result = run(cli.timeline, "--public") + assert result.exit_code == 0 + assert status1.id in result.stdout + assert status2.id in result.stdout + assert status3.id in result.stdout + + # Anon public timeline + result = run(cli.timeline, "--instance", TOOT_TEST_BASE_URL, "--public") + assert result.exit_code == 0 + assert status1.id in result.stdout + assert status2.id in result.stdout + assert status3.id in result.stdout + + # Tag timeline + result = run(cli.timeline, "--tag", "foo") + assert result.exit_code == 0 + assert status1.id in result.stdout + assert status2.id not in result.stdout + assert status3.id in result.stdout + + result = run(cli.timeline, "--tag", "bar") + assert result.exit_code == 0 + assert status1.id not in result.stdout + assert status2.id in result.stdout + assert status3.id in result.stdout + + # Anon tag timeline + result = run(cli.timeline, "--instance", TOOT_TEST_BASE_URL, "--tag", "foo") + assert result.exit_code == 0 + assert status1.id in result.stdout + assert status2.id not in result.stdout + assert status3.id in result.stdout + + # List timeline (by list name) + result = run(cli.timeline, "--list", friend_list["title"]) + assert result.exit_code == 0 + assert status1.id not in result.stdout + assert status2.id not in result.stdout + assert status3.id in result.stdout + + # List timeline (by list ID) + result = run(cli.timeline, "--list", friend_list["id"]) + assert result.exit_code == 0 + assert status1.id not in result.stdout + assert status2.id not in result.stdout + assert status3.id in result.stdout + + # Account timeline + result = run(cli.timeline, "--account", friend_user.username) + assert result.exit_code == 0 + assert status1.id not in result.stdout + assert status2.id not in result.stdout + assert status3.id in result.stdout + + result = run(cli.timeline, "--account", other_user.username) + assert result.exit_code == 0 + assert status1.id not in result.stdout + assert status2.id in result.stdout + assert status3.id not in result.stdout + + +def test_empty_timeline(app, run_as): + user = register_account(app) + result = run_as(user, cli.timeline) + assert result.exit_code == 0 + assert result.stdout.strip() == "─" * 100 + + +def test_timeline_cant_combine_timelines(run): + result = run(cli.timeline, "--tag", "foo", "--account", "bar") + assert result.exit_code == 1 + assert result.stderr.strip() == "Error: Only one of --public, --tag, --account, or --list can be used at one time." + + +def test_timeline_local_needs_public_or_tag(run): + result = run(cli.timeline, "--local") + assert result.exit_code == 1 + assert result.stderr.strip() == "Error: The --local option is only valid alongside --public or --tag." + + +def test_timeline_instance_needs_public_or_tag(run): + result = run(cli.timeline, "--instance", TOOT_TEST_BASE_URL) + assert result.exit_code == 1 + assert result.stderr.strip() == "Error: The --instance option is only valid alongside --public or --tag." + + +def test_bookmarks(app, user, run): + status1 = _post_status(app, user) + status2 = _post_status(app, user) + + api.bookmark(app, user, status1.id) + api.bookmark(app, user, status2.id) + + result = run(cli.bookmarks) + assert result.exit_code == 0 + assert status1.id in result.stdout + assert status2.id in result.stdout + assert result.stdout.find(status1.id) > result.stdout.find(status2.id) + + + result = run(cli.bookmarks, "--reverse") + assert result.exit_code == 0 + assert status1.id in result.stdout + assert status2.id in result.stdout + assert result.stdout.find(status1.id) < result.stdout.find(status2.id) + + +def test_notifications(app, user, other_user, run): + result = run(cli.notifications) + assert result.exit_code == 0 + assert result.stdout.strip() == "You have no notifications" + + text = f"Paging doctor @{user.username}" + status = _post_status(app, other_user, text) + sleep(0.5) # grr + + result = run(cli.notifications) + assert result.exit_code == 0 + assert f"@{other_user.username} mentioned you" in result.stdout + assert status.id in result.stdout + assert text in result.stdout + + result = run(cli.notifications, "--mentions") + assert result.exit_code == 0 + assert f"@{other_user.username} mentioned you" in result.stdout + assert status.id in result.stdout + assert text in result.stdout + + +def test_notifications_follow(app, user, friend_user, run_as): + result = run_as(friend_user, cli.notifications) + assert result.exit_code == 0 + assert f"@{user.username} now follows you" in result.stdout + + + result = run_as(friend_user, cli.notifications, "--mentions") + assert result.exit_code == 0 + assert "now follows you" not in result.stdout + + +def _post_status(app, user, text=None) -> Status: + text = text or str(uuid4()) + response = api.post_status(app, user, text) + return from_dict(Status, response.json()) diff --git a/toot/api.py b/toot/api.py index b7136d6..34b6bbd 100644 --- a/toot/api.py +++ b/toot/api.py @@ -8,7 +8,7 @@ from typing import BinaryIO, List, Optional from urllib.parse import urlparse, urlencode, quote from toot import App, User, http, CLIENT_NAME, CLIENT_WEBSITE -from toot.exceptions import AuthenticationError, ConsoleError +from toot.exceptions import ConsoleError from toot.utils import drop_empty_values, str_bool, str_bool_nullable @@ -300,6 +300,35 @@ def reblogged_by(app, user, status_id) -> Response: return http.get(app, user, url) +def get_timeline_generator( + app: Optional[App], + user: Optional[User], + base_url: Optional[str] = None, + account: Optional[str] = None, + list_id: Optional[str] = None, + tag: Optional[str] = None, + local: bool = False, + public: bool = False, + limit=20, # TODO +): + if public: + if base_url: + return anon_public_timeline_generator(base_url, local=local, limit=limit) + else: + return public_timeline_generator(app, user, local=local, limit=limit) + elif tag: + if base_url: + return anon_tag_timeline_generator(base_url, tag, limit=limit) + else: + return tag_timeline_generator(app, user, tag, local=local, limit=limit) + elif account: + return account_timeline_generator(app, user, account, limit=limit) + elif list_id: + return timeline_list_generator(app, user, list_id, limit=limit) + else: + return home_timeline_generator(app, user, limit=limit) + + def _get_next_path(headers): """Given timeline response headers, returns the path to the next batch""" links = headers.get('Link', '') @@ -309,6 +338,14 @@ def _get_next_path(headers): return "?".join([parsed.path, parsed.query]) +def _get_next_url(headers) -> Optional[str]: + """Given timeline response headers, returns the url to the next batch""" + links = headers.get('Link', '') + match = re.match('<([^>]+)>; rel="next"', links) + if match: + return match.group(1) + + def _timeline_generator(app, user, path, params=None): while path: response = http.get(app, user, path, params) @@ -369,7 +406,7 @@ def conversation_timeline_generator(app, user, limit=20): return _conversation_timeline_generator(app, user, path, params) -def account_timeline_generator(app: App, user: User, account_name: str, replies=False, reblogs=False, limit=20): +def account_timeline_generator(app, user, account_name: str, replies=False, reblogs=False, limit=20): account = find_account(app, user, account_name) path = f"/api/v1/accounts/{account['id']}/statuses" params = {"limit": limit, "exclude_replies": not replies, "exclude_reblogs": not reblogs} @@ -381,24 +418,23 @@ def timeline_list_generator(app, user, list_id, limit=20): return _timeline_generator(app, user, path, {'limit': limit}) -def _anon_timeline_generator(instance, path, params=None): - while path: - url = f"https://{instance}{path}" +def _anon_timeline_generator(url, params=None): + while url: response = http.anon_get(url, params) yield response.json() - path = _get_next_path(response.headers) + url = _get_next_url(response.headers) -def anon_public_timeline_generator(instance, local=False, limit=20): - path = '/api/v1/timelines/public' - params = {'local': str_bool(local), 'limit': limit} - return _anon_timeline_generator(instance, path, params) +def anon_public_timeline_generator(base_url, local=False, limit=20): + query = urlencode({"local": str_bool(local), "limit": limit}) + url = f"{base_url}/api/v1/timelines/public?{query}" + return _anon_timeline_generator(url) -def anon_tag_timeline_generator(instance, hashtag, local=False, limit=20): - path = f"/api/v1/timelines/tag/{quote(hashtag)}" - params = {'local': str_bool(local), 'limit': limit} - return _anon_timeline_generator(instance, path, params) +def anon_tag_timeline_generator(base_url, hashtag, local=False, limit=20): + query = urlencode({"local": str_bool(local), "limit": limit}) + url = f"{base_url}/api/v1/timelines/tag/{quote(hashtag)}?{query}" + return _anon_timeline_generator(url) def get_media(app: App, user: User, id: str): @@ -538,8 +574,8 @@ def verify_credentials(app, user) -> Response: return http.get(app, user, '/api/v1/accounts/verify_credentials') -def get_notifications(app, user, exclude_types=[], limit=20): - params = {"exclude_types[]": exclude_types, "limit": limit} +def get_notifications(app, user, types=[], exclude_types=[], limit=20): + params = {"types[]": types, "exclude_types[]": exclude_types, "limit": limit} return http.get(app, user, '/api/v1/notifications', params).json() @@ -570,7 +606,7 @@ def get_list_accounts(app, user, list_id): return _get_response_list(app, user, path) -def create_list(app, user, title, replies_policy): +def create_list(app, user, title, replies_policy="none"): url = "/api/v1/lists" json = {'title': title} if replies_policy: diff --git a/toot/cli/__init__.py b/toot/cli/__init__.py index 2e6451c..d4d14ed 100644 --- a/toot/cli/__init__.py +++ b/toot/cli/__init__.py @@ -1,9 +1,11 @@ -from toot.cli.base import cli, Context # noqa +# flake8: noqa +from toot.cli.base import cli, Context -from toot.cli.auth import * from toot.cli.accounts import * +from toot.cli.auth import * from toot.cli.lists import * from toot.cli.post import * from toot.cli.read import * from toot.cli.statuses import * from toot.cli.tags import * +from toot.cli.timelines import * diff --git a/toot/cli/base.py b/toot/cli/base.py index c86b531..2cd86b9 100644 --- a/toot/cli/base.py +++ b/toot/cli/base.py @@ -40,7 +40,7 @@ CONTEXT = dict( # Data object to add to Click context class Context(NamedTuple): - app: Optional[App] = None + app: Optional[App] user: Optional[User] = None color: bool = False debug: bool = False diff --git a/toot/cli/read.py b/toot/cli/read.py index f83d449..c68eaea 100644 --- a/toot/cli/read.py +++ b/toot/cli/read.py @@ -74,6 +74,7 @@ def instance(ctx: Context, instance_url: Optional[str], json: bool): @json_option @pass_context def search(ctx: Context, query: str, resolve: bool, json: bool): + """Search for users or hashtags""" response = api.search(ctx.app, ctx.user, query, resolve) if json: print(response.text) diff --git a/toot/cli/timelines.py b/toot/cli/timelines.py new file mode 100644 index 0000000..ffb382d --- /dev/null +++ b/toot/cli/timelines.py @@ -0,0 +1,180 @@ +import sys +import click + +from toot import api +from toot.cli.base import cli, pass_context, Context +from typing import Optional +from toot.cli.validators import validate_instance + +from toot.entities import Notification, Status, from_dict +from toot.output import print_notifications, print_timeline + + +@cli.command() +@click.option( + "--instance", "-i", + callback=validate_instance, + help="""Domain or base URL of the instance from which to read, + e.g. 'mastodon.social' or 'https://mastodon.social'""", +) +@click.option("--account", "-a", help="Show account timeline") +@click.option("--list", help="Show list timeline") +@click.option("--tag", "-t", help="Show hashtag timeline") +@click.option("--public", "-p", is_flag=True, help="Show public timeline") +@click.option( + "--local", "-l", is_flag=True, + help="Show only statuses from the local instance (public and tag timelines only)" +) +@click.option( + "--reverse", "-r", is_flag=True, + help="Reverse the order of the shown timeline (new posts at the bottom)" +) +@click.option( + "--once", "-1", is_flag=True, + help="Only show the first toots, do not prompt to continue" +) +@click.option( + "--count", "-c", type=int, default=10, + help="Number of posts per page (max 20)" +) +@pass_context +def timeline( + ctx: Context, + instance: Optional[str], + account: Optional[str], + list: Optional[str], + tag: Optional[str], + public: bool, + local: bool, + reverse: bool, + once: bool, + count: int, +): + """Show recent items in a timeline + + By default shows the home timeline. + """ + if len([arg for arg in [tag, list, public, account] if arg]) > 1: + raise click.ClickException("Only one of --public, --tag, --account, or --list can be used at one time.") + + if local and not (public or tag): + raise click.ClickException("The --local option is only valid alongside --public or --tag.") + + if instance and not (public or tag): + raise click.ClickException("The --instance option is only valid alongside --public or --tag.") + + list_id = _get_list_id(ctx, list) + + """Show recent statuses in a timeline""" + generator = api.get_timeline_generator( + ctx.app, + ctx.user, + base_url=instance, + account=account, + list_id=list_id, + tag=tag, + public=public, + local=local, + limit=count, + ) + + _show_timeline(generator, reverse, once) + + +@cli.command() +@click.option( + "--reverse", "-r", is_flag=True, + help="Reverse the order of the shown timeline (new posts at the bottom)" +) +@click.option( + "--once", "-1", is_flag=True, + help="Only show the first toots, do not prompt to continue" +) +@click.option( + "--count", "-c", type=int, default=10, + help="Number of posts per page (max 20)" +) +@pass_context +def bookmarks( + ctx: Context, + reverse: bool, + once: bool, + count: int, +): + """Show recent statuses in a timeline""" + generator = api.bookmark_timeline_generator(ctx.app, ctx.user, limit=count) + _show_timeline(generator, reverse, once) + + +@cli.command() +@click.option("--clear", help="Dismiss all notifications and exit") +@click.option( + "--reverse", "-r", is_flag=True, + help="Reverse the order of the shown notifications (newest on top)" +) +@click.option( + "--mentions", "-m", is_flag=True, + help="Show only mentions" +) +@pass_context +def notifications( + ctx: Context, + clear: bool, + reverse: bool, + mentions: int, +): + """Show notifications""" + if clear: + api.clear_notifications(ctx.app, ctx.user) + click.secho("✓ Notifications cleared", fg="green") + return + + exclude = [] + if mentions: + # Filter everything except mentions + # https://docs.joinmastodon.org/methods/notifications/ + exclude = ["follow", "favourite", "reblog", "poll", "follow_request"] + + notifications = api.get_notifications(ctx.app, ctx.user, exclude_types=exclude) + + if not notifications: + click.echo("You have no notifications") + return + + if reverse: + notifications = reversed(notifications) + + notifications = [from_dict(Notification, n) for n in notifications] + print_notifications(notifications) + + +def _show_timeline(generator, reverse, once): + while True: + try: + items = next(generator) + except StopIteration: + click.echo("That's all folks.") + return + + if reverse: + items = reversed(items) + + statuses = [from_dict(Status, item) for item in items] + print_timeline(statuses) + + if once or not sys.stdout.isatty(): + break + + char = input("\nContinue? [Y/n] ") + if char.lower() == "n": + break + + +def _get_list_id(ctx: Context, value: Optional[str]) -> Optional[str]: + if not value: + return None + + lists = api.get_lists(ctx.app, ctx.user) + for list in lists: + if list["id"] == value or list["title"] == value: + return list["id"]