В рамках проекта создается:
- core framework для определения конвейерной обработки данных через проблемно-ориентированный язык (DSL);
- picture-pipelines реализация модельной задачи конвейерной обработки изображений;
Конвейер (workflow) представляет собой совокупность типизированных каналов (pipe) и узлов обработки (node).
Типизированный канал (pipe) - это линейное хранилище типизированных объектов и предоставляющий интерфейс:
- добавление нового объекта;
- добавление обработчика, реагирующего на добавление нового элемента;
Узел обработки (node) - это действующая единица конвейера, определяющая способ обработки входных каналов и добавления элементов в выходные каналы.
Состояние (контекст) конвейера - это совокупность состояний узлов обработки и типизированных каналов. За операцию изменяющую контекст стоит понимать любую атомарную операцию, изменяющую состояния узлов обработки или каналов.
Состояние узла обработки - это совокупность объектов, находящиеся в обработки в данном узле.
Состояние типизированного канала - это совокупность объектов хранящихся в канале.
Конвейер может находиться в следующих состояниях:
Состояние | Описание |
---|---|
Инициализирован | Дефолтное состояние созданного конвейера, когда происходят инициализация внутренних компонентов, исходя из описанного пользователем DSL его остановке. |
В работе | Состояние наступающее по действию пользователя, при котором идет независимое друг от друга исполнение узлов обработки типизированных каналов. |
В зависании | Состояние наступающее, когда за меньше чем N количество изменений состояний конвейера он переходит в такое же состояние. |
Завершен | Дефолтное состояние конвейера при принудительной его остановке пользователем. |
Требование | Описание |
---|---|
Узлы контейнера имеют наборы входных и выходных каналов | В рамках реализации требования планируется ограничение до 3 входных и выходных потоков |
Компоновка узлов и каналов поддается статическому контролю типизации | Нет возможности нарушить типизацию узлов на этапе написания кода |
Независимое исполнонение узлов | Исполнение узлов на виртуальных потоках с использованием определенного пула системных потоков |
Требование | Описание |
---|---|
Обобщенные части конвейеров | В рамках реализации требования планируется ограничение до 3 входных и выходных потоков |
Поддержка обработки циклов в графе | Вывод предупреждения при инициализации конвейера |
Обработка зависаний конвейера | Выброс исключений, когда конвейер, находящийся в работе, переходит в состояние зависания |
Корневое доменное слово, используется для определения частей конвеера и их взаимосвязей
Workflow(
dispatcher: CoroutineDispatcher,
countStackContext: Int,
enableSecurityDeadLock: Bool,
enabledWarringCyclePipe: Bool,
updateContext: (Workflow.Context)-> Unit) {
// Сборка конвейера в контексте WorkflowBuilder
}
Названия | Тип | Значение по умолчанию | Назначение |
---|---|---|---|
dispatcher | CoroutineDispatcher |
CoroutineDispatcher.Default | Параметр определяет контекст виртуальных потоков - корутин, в которых независимо исполняются узлы конвейера. |
countStackContext | Int |
10 | Параметр обозначает количество хранящиеся в стеке состояний. |
enableSecurityDeadLock | Bool |
false | Параметр включает проверку на состояния зависания конвейера. При выявлении изменения состояния конвейера выбрасывается исключение. |
enabledWarringCyclePipe | Bool |
false | Параметр включает проверку конвейера на этапе инициализации. При выявлении циклов в зависимостях между узлами, в консоли выводится предупреждение о возможных негативных последствиях. |
updateContext | (Workflow.Context)-> Unit |
{} | Функция обработчик, вызываемая при каждом изменении состоянии конвейера. |
Для создания шаблонов конвейеров (SharedWorkflow
) предлагается использовать функции, в параметрах которых обозначаются переиспользуемые компоненты, а возращаемым результатом будет SharedWorkflow
fun mySharedWorkflow(input: Pipe<Int>, output: Pipe<Int>, firstParam: Int): SharedWorkflow =
SharedWorkflow {
node(
name = "Пример",
input = input,
output = output
) { comsumer, producer ->
// logic
}
}
node(
name = "Первая нода",
input = Pair(pipe1, pipe2),
output = Pair(pipe3, pipe4)
) { consumerPipe1, cosumerPipe2, producerPipe3, producerPipe4 ->
// Define the action logic here
println("Executing action for node 'Первая нода'")
// e.g., Use producers and consumers as needed
}
Названия | Тип | Значение по умолчанию | Назначение |
---|---|---|---|
name | String |
Нет | Название узла, используемого в логгировании |
input | Pipe<T> | Pair<T,Q> | Triple<T,Q,S> |
Нет | Входящие в узел типизированные каналы |
output | Pipe<T> | Pair<T,Q> | Triple<T,Q,S> |
Нет | Выходящие из узла типизированные каналы |
Передается анонимная функция, в параметрах которых последовательно (как это указано в параметрах) передаются интерфейсы для взаимодействия с типизированным каналом.
Название | Назначение |
---|---|
start | Конвейер переходит в состояние В работе |
pause | Конвейер останавливает исполнение узлов |
stop | Конвейер перходит в состояние Завершен , очищается котекст узлов и каналов, принудительно завершаются все корутины |
Типизированный канал.
Внутренний класс, предоставляющий интерфейс для добавления обработчиков новых элементов в канале.
Название | Назначение |
---|---|
plus |
Сложение Consumer необходимо для создание атомарной операции чтения нового значения сразу из двух каналов |
Параметры | Назначение |
---|---|
action |
Функция обработчик всех занчений |
Параметры | Назначение |
---|---|
Функция одного атомарного чтения |
Внутренний класс, предоставляющий интерфейс для добавления элементов в канал.
Название | Назначение |
---|---|
plus |
Сложение Producer необходимо для создание атомарной операции чтения нового значения сразу из двух каналов |
Параметры | Назначение |
---|---|
value |
Значение передаваемое в канал |
Pipes расширение Koltin Channel.
Kotlin Channel концептуально очень похож на BlockingQueue. Одно ключевое отличие состоит в том, что вместо блокирующей put операции у него есть приостанавливающая send, а вместо блокирующей take операции у него есть приостанавливающая receive.
Pipeline - это шаблон, в котором одна сопрограмма создает, возможно, бесконечный, поток значений, а другая сопрограмма или сопрограммы потребляют этот поток, выполняют некоторую обработку и производят некоторые другие результаты.
Используются виртуальные потоки Kotlin или корутины.
Корутина — это экземпляр приостанавливаемого вычисления. Мы можем приостановить выполнение корутины, чтобы позволить другим корутинам отработать в том же потоке. В дальнейшем выполнение этой корутины может быть возобновлено в том же или даже в другом потоке. В момент приостановки работы корутины связанные с ней вычисления останавливаются, сохраняются в памяти и удаляются из потока, позволяя ему свободно заниматься другими задачами.
Вызов suspend-функции приостанавливает выполнение функции и позволяет потоку выполнять другие действия. Через некоторое время приостановленная функция может быть возобновлена в том же или другом потоке.
Каждая nodes запускается в отдельном корутине с возможностью управлять состоянием корутины.
Использовалась библиотека editimage
Для преобразований созданы 6 node