mirror of
https://github.com/ihabunek/toot.git
synced 2025-02-02 15:07:51 -05:00
Add tags timeline command
This commit is contained in:
parent
bb32a691b7
commit
561bc6d7b5
127
toot/cli/tags.py
127
toot/cli/tags.py
@ -1,10 +1,15 @@
|
||||
import click
|
||||
import json as pyjson
|
||||
from typing import Optional
|
||||
from urllib.parse import quote
|
||||
|
||||
from toot import api
|
||||
from toot.cli import cli, pass_context, json_option, Context
|
||||
from toot.entities import Tag, from_dict
|
||||
from toot.output import print_tag_list
|
||||
import click
|
||||
|
||||
from toot import api, http
|
||||
from toot.cli import Context, cli, json_option, pass_context
|
||||
from toot.cli.validators import validate_positive
|
||||
from toot.entities import Status, Tag, from_dict, from_response_list, from_responses_batched
|
||||
from toot.output import green, print_tag_list, print_timeline, yellow
|
||||
from toot.utils import drop_empty_values, str_bool_nullable
|
||||
|
||||
|
||||
@cli.group()
|
||||
@ -128,3 +133,115 @@ def unfeature(ctx: Context, tag: str, json: bool):
|
||||
click.echo(response.text)
|
||||
else:
|
||||
click.secho(f"✓ Tag #{featured_tag['name']} is no longer featured", fg="green")
|
||||
|
||||
|
||||
@tags.command()
|
||||
@click.argument("tag_name")
|
||||
@click.option(
|
||||
"-l",
|
||||
"--local",
|
||||
is_flag=True,
|
||||
help="Return only local statuses",
|
||||
)
|
||||
@click.option(
|
||||
"-r",
|
||||
"--remote",
|
||||
is_flag=True,
|
||||
help="Return only remote statuses",
|
||||
)
|
||||
@click.option(
|
||||
"-m",
|
||||
"--media",
|
||||
is_flag=True,
|
||||
help="Return only statuses with media attachments",
|
||||
)
|
||||
@click.option(
|
||||
"-n",
|
||||
"--limit",
|
||||
type=int,
|
||||
default=20,
|
||||
help="Number of results to fetch per request [max: 40]",
|
||||
)
|
||||
@click.option(
|
||||
"-p",
|
||||
"--pager",
|
||||
help="Page the results, optionally define how many results to show per page",
|
||||
type=int,
|
||||
callback=validate_positive,
|
||||
is_flag=False,
|
||||
flag_value=10,
|
||||
)
|
||||
@click.option(
|
||||
"-c",
|
||||
"--clear",
|
||||
help="Clear the screen before printing",
|
||||
is_flag=True,
|
||||
)
|
||||
@json_option
|
||||
@pass_context
|
||||
def timeline(
|
||||
ctx: Context,
|
||||
tag_name: str,
|
||||
local: bool,
|
||||
remote: bool,
|
||||
media: bool,
|
||||
limit: int,
|
||||
pager: Optional[int],
|
||||
clear: bool,
|
||||
json: bool,
|
||||
):
|
||||
"""View hashtag timeline"""
|
||||
# TODO: Add `any`, `all`, and `none` params
|
||||
# TODO: Add `max_id`, `since_id`, and `min_id` params
|
||||
path = f"/api/v1/timelines/tag/{quote(tag_name)}"
|
||||
|
||||
params = drop_empty_values(
|
||||
{
|
||||
"local": str_bool_nullable(local),
|
||||
"remote": str_bool_nullable(remote),
|
||||
"media": str_bool_nullable(media),
|
||||
"limit": limit,
|
||||
}
|
||||
)
|
||||
|
||||
if json:
|
||||
response = http.get(ctx.app, ctx.user, path, params)
|
||||
click.echo(response.text)
|
||||
return
|
||||
|
||||
if pager:
|
||||
first = True
|
||||
printed_any = False
|
||||
|
||||
responses = http.get_paged(ctx.app, ctx.user, path, params)
|
||||
for page in from_responses_batched(responses, Status, pager):
|
||||
if not first and not get_continue():
|
||||
break
|
||||
if clear:
|
||||
click.clear()
|
||||
print_timeline(page)
|
||||
first = False
|
||||
printed_any = True
|
||||
|
||||
if not printed_any:
|
||||
click.echo("No statuses found containing the given tag")
|
||||
return
|
||||
|
||||
response = http.get(ctx.app, ctx.user, path, params)
|
||||
statuses = from_response_list(Status, response)
|
||||
if statuses:
|
||||
print_timeline(statuses)
|
||||
if len(statuses) == limit:
|
||||
click.secho("There may be more results. Increase the --limit or use --pager to see the rest.", dim=True)
|
||||
else:
|
||||
click.echo("No statuses found containing the given tag")
|
||||
|
||||
|
||||
def get_continue():
|
||||
click.secho(f"Press {green('Space')} or {green('Enter')} to continue, {yellow('Esc')} or {yellow('q')} to break.")
|
||||
while True:
|
||||
char = click.getchar()
|
||||
if char == ' ' or char == '\r':
|
||||
return True
|
||||
if char == '\x1b' or char == 'q':
|
||||
return False
|
||||
|
@ -91,3 +91,9 @@ def validate_cache_size(ctx: click.Context, param: str, value: Optional[str]) ->
|
||||
elif size < 1:
|
||||
raise click.BadParameter("Cache size too small: 1MB minimum.")
|
||||
return size
|
||||
|
||||
|
||||
def validate_positive(_ctx: click.Context, _param: click.Parameter, value: Optional[int]):
|
||||
if value is not None and value <= 0:
|
||||
raise click.BadParameter("must be greater than 0")
|
||||
return value
|
||||
|
@ -19,7 +19,7 @@ from typing import get_args, get_origin, get_type_hints
|
||||
|
||||
from requests import Response
|
||||
|
||||
from toot.utils import get_text
|
||||
from toot.utils import batched, get_text
|
||||
from toot.utils.datetime import parse_datetime
|
||||
|
||||
# Generic data class instance
|
||||
@ -513,6 +513,19 @@ def from_response_list(cls: Type[T], response: Response) -> t.List[T]:
|
||||
"""Convert a list of nested dicts extracted from response body into a list of `cls` instances."""
|
||||
return from_dict_list(cls, response.json())
|
||||
|
||||
def from_responses_batched(
|
||||
responses: t.Iterable[Response],
|
||||
cls: Type[T],
|
||||
page_size: int,
|
||||
) -> t.Generator[t.List[T], None, None]:
|
||||
def _gen():
|
||||
for response in responses:
|
||||
statuses = from_dict_list(cls, response.json())
|
||||
for status in statuses:
|
||||
yield status
|
||||
|
||||
yield from batched(_gen(), page_size)
|
||||
|
||||
|
||||
@lru_cache
|
||||
def _get_fields(cls: type) -> t.List[Field]:
|
||||
|
Loading…
x
Reference in New Issue
Block a user