Skip to content

Provide common replacement utils / functions many projects will need #65

@sethmlarson

Description

@sethmlarson

Some utility functions I was thinking about:

def azip(*iterables):  # Convert any iterables and async iterables into a zipped async iterable
    iterators = [aiter(x) for x in iterables]

    async def generator():
        while True:
            try:
                yield tuple([await x.__anext__() for x in iterators])
            except StopAsyncIteration:
                break

    return generator().__aiter__()

def aiter(x):  # Convert any iterable or async iterable into an async iterator
    if hasattr(x, "__aiter__"):
        return x.__aiter__()
    elif hasattr(x, "__anext__"):
        return x

    async def aiter_wrapper():
        for item in x:
            yield item

    return aiter_wrapper().__aiter__()

async def anext(i):  # Advance an async iterator, good for mapping to `next()`
    return await i.__anext__()


async def await_or_return(x):  # Maps to 'return_()'?
    if iscoroutine(x):
        return await x
    return x


# All of the sync variations of the above functions
next = next
iter = iter
zip = zip
def return_(x):
    return x

Could also have azip_longest -> zip_longest, etc.

All of the async functions could be available only on Python 3.6+ so this could be used on projects that support 3.5 or less.

Could also provide type hints that are effected by unasync properly as well.

AsyncOrSyncIterable[X] = Union[AsyncIterable[X], Iterable[X]]
SyncIterable[X] = Iterable[X]

This should probably be a separate project to unasync as unasync is primarily a build tool, raising it here because it'd make sense for the tool to live under python-trio, maybe unasync-utils?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions