Skip to content

Tasks API #62

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 146 additions & 0 deletions arangoasync/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@
PermissionUpdateError,
ServerStatusError,
ServerVersionError,
TaskCreateError,
TaskDeleteError,
TaskGetError,
TaskListError,
TransactionAbortError,
TransactionCommitError,
TransactionExecuteError,
Expand Down Expand Up @@ -2193,6 +2197,148 @@ def response_handler(resp: Response) -> Json:

return await self._executor.execute(request, response_handler)

async def tasks(self) -> Result[Jsons]:
"""Fetches all existing tasks from the server.

Returns:
list: List of currently active server tasks.

Raises:
TaskListError: If the list cannot be retrieved.

References:
- `list-all-tasks <https://docs.arangodb.com/stable/develop/http-api/tasks/#list-all-tasks>`__
""" # noqa: E501
request = Request(method=Method.GET, endpoint="/_api/tasks")

def response_handler(resp: Response) -> Jsons:
if not resp.is_success:
raise TaskListError(resp, request)
result: Jsons = self.deserializer.loads_many(resp.raw_body)
return result

return await self._executor.execute(request, response_handler)

async def task(self, task_id: str) -> Result[Json]:
"""Return the details of an active server task.

Args:
task_id (str) -> Server task ID.

Returns:
dict: Details of the server task.

Raises:
TaskGetError: If the task details cannot be retrieved.

References:
- `get-a-task <https://docs.arangodb.com/stable/develop/http-api/tasks/#get-a-task>`__
""" # noqa: E501
request = Request(method=Method.GET, endpoint=f"/_api/tasks/{task_id}")

def response_handler(resp: Response) -> Json:
if not resp.is_success:
raise TaskGetError(resp, request)
result: Json = self.deserializer.loads(resp.raw_body)
return result

return await self._executor.execute(request, response_handler)

async def create_task(
self,
command: str,
task_id: Optional[str] = None,
name: Optional[str] = None,
offset: Optional[int] = None,
params: Optional[Json] = None,
period: Optional[int] = None,
) -> Result[Json]:
"""Create a new task.

Args:
command (str): The JavaScript code to be executed.
task_id (str | None): Optional task ID. If not provided, the server will
generate a unique ID.
name (str | None): The name of the task.
offset (int | None): The offset in seconds after which the task should
start executing.
params (dict | None): Parameters to be passed to the command.
period (int | None): The number of seconds between the executions.

Returns:
dict: Details of the created task.

Raises:
TaskCreateError: If the task cannot be created.

References:
- `create-a-task <https://docs.arangodb.com/stable/develop/http-api/tasks/#create-a-task>`__
- `create-a-task-with-id <https://docs.arangodb.com/stable/develop/http-api/tasks/#create-a-task-with-id>`__
""" # noqa: E501
data: Json = {"command": command}
if name is not None:
data["name"] = name
if offset is not None:
data["offset"] = offset
if params is not None:
data["params"] = params
if period is not None:
data["period"] = period

if task_id is None:
request = Request(
method=Method.POST,
endpoint="/_api/tasks",
data=self.serializer.dumps(data),
)
else:
request = Request(
method=Method.PUT,
endpoint=f"/_api/tasks/{task_id}",
data=self.serializer.dumps(data),
)

def response_handler(resp: Response) -> Json:
if not resp.is_success:
raise TaskCreateError(resp, request)
result: Json = self.deserializer.loads(resp.raw_body)
return result

return await self._executor.execute(request, response_handler)

async def delete_task(
self,
task_id: str,
ignore_missing: bool = False,
) -> Result[bool]:
"""Delete a server task.

Args:
task_id (str): Task ID.
ignore_missing (bool): If `True`, do not raise an exception if the
task does not exist.

Returns:
bool: `True` if the task was deleted successfully, `False` if the
task was not found and **ignore_missing** was set to `True`.

Raises:
TaskDeleteError: If the operation fails.

References:
- `delete-a-task <https://docs.arangodb.com/stable/develop/http-api/tasks/#delete-a-task>`__
""" # noqa: E501
request = Request(method=Method.DELETE, endpoint=f"/_api/tasks/{task_id}")

def response_handler(resp: Response) -> bool:
if resp.is_success:
return True
if resp.status_code == HTTP_NOT_FOUND and ignore_missing:
return False
raise TaskDeleteError(resp, request)

return await self._executor.execute(request, response_handler)


class StandardDatabase(Database):
"""Standard database API wrapper.
Expand Down
16 changes: 16 additions & 0 deletions arangoasync/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,22 @@ class SortValidationError(ArangoClientError):
"""Invalid sort parameters."""


class TaskCreateError(ArangoServerError):
"""Failed to create server task."""


class TaskDeleteError(ArangoServerError):
"""Failed to delete server task."""


class TaskGetError(ArangoServerError):
"""Failed to retrieve server task details."""


class TaskListError(ArangoServerError):
"""Failed to retrieve server tasks."""


class TransactionAbortError(ArangoServerError):
"""Failed to abort transaction."""

Expand Down
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ Contents
compression
serialization
backup
task
errors
errno
logging
Expand Down
51 changes: 51 additions & 0 deletions docs/task.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
Tasks
-----

ArangoDB can schedule user-defined Javascript snippets as one-time or periodic
(re-scheduled after each execution) tasks. Tasks are executed in the context of
the database they are defined in.

**Example:**

.. code-block:: python

from arangoasync import ArangoClient
from arangoasync.auth import Auth

# Initialize the client for ArangoDB.
async with ArangoClient(hosts="http://localhost:8529") as client:
auth = Auth(username="root", password="passwd")

# Connect to "test" database as root user.
db = await client.db("test", auth=auth)

# Create a new task which simply prints parameters.
await db.create_task(
name="test_task",
command="""
var task = function(params){
var db = require('@arangodb');
db.print(params);
}
task(params);
""",
params={"foo": "bar"},
offset=300,
period=10,
task_id="001"
)

# List all active tasks
tasks = await db.tasks()

# Retrieve details of a task by ID.
details = await db.task("001")

# Delete an existing task by ID.
await db.delete_task('001', ignore_missing=True)


.. note::
When deleting a database, any tasks that were initialized under its context
remain active. It is therefore advisable to delete any running tasks before
deleting the database.
13 changes: 13 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,19 @@ async def teardown():
verify=False,
)

# Remove all tasks
test_tasks = [
task
for task in await sys_db.tasks()
if task["name"].startswith("test_task")
]
await asyncio.gather(
*(
sys_db.delete_task(task["id"], ignore_missing=True)
for task in test_tasks
)
)

# Remove all test users.
tst_users = [
user["user"]
Expand Down
18 changes: 18 additions & 0 deletions tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,21 @@ def generate_analyzer_name():
str: Random analyzer name.
"""
return f"test_analyzer_{uuid4().hex}"


def generate_task_name():
"""Generate and return a random task name.

Returns:
str: Random task name.
"""
return f"test_task_{uuid4().hex}"


def generate_task_id():
"""Generate and return a random task ID.

Returns:
str: Random task ID
"""
return f"test_task_id_{uuid4().hex}"
79 changes: 79 additions & 0 deletions tests/test_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import pytest

from arangoasync.exceptions import (
TaskCreateError,
TaskDeleteError,
TaskGetError,
TaskListError,
)
from tests.helpers import generate_task_id, generate_task_name


@pytest.mark.asyncio
async def test_task_management(sys_db, bad_db):
# This test intentionally uses the system database because cleaning up tasks is
# easier there.

test_command = 'require("@arangodb").print(params);'

# Test errors
with pytest.raises(TaskCreateError):
await bad_db.create_task(command=test_command)
with pytest.raises(TaskGetError):
await bad_db.task("non_existent_task_id")
with pytest.raises(TaskListError):
await bad_db.tasks()
with pytest.raises(TaskDeleteError):
await bad_db.delete_task("non_existent_task_id")

# Create a task with a random ID
task_name = generate_task_name()
new_task = await sys_db.create_task(
name=task_name,
command=test_command,
params={"foo": 1, "bar": 2},
offset=1,
)
assert new_task["name"] == task_name
task_id = new_task["id"]
assert await sys_db.task(task_id) == new_task

# Delete task
assert await sys_db.delete_task(task_id) is True

# Create a task with a specific ID
task_name = generate_task_name()
task_id = generate_task_id()
new_task = await sys_db.create_task(
name=task_name,
command=test_command,
params={"foo": 1, "bar": 2},
offset=1,
period=10,
task_id=task_id,
)
assert new_task["name"] == task_name
assert new_task["id"] == task_id

# Try to create a duplicate task
with pytest.raises(TaskCreateError):
await sys_db.create_task(
name=task_name,
command=test_command,
params={"foo": 1, "bar": 2},
task_id=task_id,
)

# Test get missing task
with pytest.raises(TaskGetError):
await sys_db.task(generate_task_id())

# Test list tasks
tasks = await sys_db.tasks()
assert len(tasks) == 1

# Delete tasks
assert await sys_db.delete_task(task_id) is True
assert await sys_db.delete_task(task_id, ignore_missing=True) is False
with pytest.raises(TaskDeleteError):
await sys_db.delete_task(task_id)
Loading