aboutsummaryrefslogtreecommitdiffstats
path: root/modules/by-name/ti/timewarrior/taskwarrior_hooks
diff options
context:
space:
mode:
Diffstat (limited to 'modules/by-name/ti/timewarrior/taskwarrior_hooks')
-rwxr-xr-xmodules/by-name/ti/timewarrior/taskwarrior_hooks/on-modify.track-timewarrior.py124
-rwxr-xr-xmodules/by-name/ti/timewarrior/taskwarrior_hooks/on-modify.track-total-active-time.py139
2 files changed, 263 insertions, 0 deletions
diff --git a/modules/by-name/ti/timewarrior/taskwarrior_hooks/on-modify.track-timewarrior.py b/modules/by-name/ti/timewarrior/taskwarrior_hooks/on-modify.track-timewarrior.py
new file mode 100755
index 00000000..0bef8bc2
--- /dev/null
+++ b/modules/by-name/ti/timewarrior/taskwarrior_hooks/on-modify.track-timewarrior.py
@@ -0,0 +1,124 @@
+#!/usr/bin/env python3
+
+###############################################################################
+#
+# Copyright 2016 - 2021, 2023, Gothenburg Bit Factory
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included
+# in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+#
+# https://www.opensource.org/licenses/mit-license.php
+#
+###############################################################################
+
+import json
+import subprocess
+import sys
+
+# Hook should extract all the following for use as Timewarrior tags:
+# UUID
+# Project
+# Tags
+# Description
+# UDAs
+
+try:
+ input_stream = sys.stdin.buffer
+except AttributeError:
+ input_stream = sys.stdin
+
+
+MAX_ACTIVE = 1
+
+
+def extract_tags_from(json_obj) -> [str]:
+ # Extract attributes for use as tags.
+ tags = [json_obj["description"]]
+
+ if "project" in json_obj:
+ tags.append(json_obj["project"])
+
+ if "tags" in json_obj:
+ if type(json_obj["tags"]) is str:
+ # Usage of tasklib (e.g. in taskpirate) converts the tag list into a string
+ # If this is the case, convert it back into a list first
+ # See https://github.com/tbabej/taskpirate/issues/11
+ tags.extend(json_obj["tags"].split(","))
+ else:
+ tags.extend(json_obj["tags"])
+
+ return tags
+
+
+def extract_annotation_from(json_obj):
+ if "annotations" not in json_obj:
+ return "''"
+
+ return json_obj["annotations"][0]["description"]
+
+
+def main(old, new):
+ start_or_stop = ""
+
+ # Started task.
+ if "start" in new and "start" not in old:
+ # Prevent this task from starting if "task +ACTIVE count" is greater than "MAX_ACTIVE".
+ p = subprocess.Popen(
+ ["task", "+ACTIVE", "status:pending", "count", "rc.verbose:off"],
+ stdout=subprocess.PIPE,
+ )
+ out, err = p.communicate()
+ count = int(out.rstrip())
+ if count >= MAX_ACTIVE:
+ print(
+ f"Only {MAX_ACTIVE} task(s) can be active at a time.",
+ )
+ sys.exit(1)
+ else:
+ start_or_stop = "start"
+
+ # Stopped task.
+ elif ("start" not in new or "end" in new) and "start" in old:
+ start_or_stop = "stop"
+
+ if start_or_stop:
+ tags = extract_tags_from(new)
+
+ subprocess.call(["timew", start_or_stop] + tags + [":yes"])
+
+ # Modifications to task other than start/stop
+ elif "start" in new and "start" in old:
+ old_tags = extract_tags_from(old)
+ new_tags = extract_tags_from(new)
+
+ if old_tags != new_tags:
+ subprocess.call(["timew", "untag", "@1"] + old_tags + [":yes"])
+ subprocess.call(["timew", "tag", "@1"] + new_tags + [":yes"])
+
+ old_annotation = extract_annotation_from(old)
+ new_annotation = extract_annotation_from(new)
+
+ if old_annotation != new_annotation:
+ subprocess.call(["timew", "annotate", "@1", new_annotation])
+
+
+if __name__ == "__main__":
+ old = json.loads(input_stream.readline().decode("utf-8", errors="replace"))
+ new = json.loads(input_stream.readline().decode("utf-8", errors="replace"))
+ print(json.dumps(new))
+ main(old, new)
diff --git a/modules/by-name/ti/timewarrior/taskwarrior_hooks/on-modify.track-total-active-time.py b/modules/by-name/ti/timewarrior/taskwarrior_hooks/on-modify.track-total-active-time.py
new file mode 100755
index 00000000..b98e02fc
--- /dev/null
+++ b/modules/by-name/ti/timewarrior/taskwarrior_hooks/on-modify.track-total-active-time.py
@@ -0,0 +1,139 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+# Copyright (C) 2016-present Arctic Ice Studio <development@arcticicestudio.com>
+# Copyright (C) 2016-present Sven Greb <development@svengreb.de>
+
+# Project: igloo
+# Repository: https://github.com/arcticicestudio/igloo
+# License: MIT
+# References:
+# https://taskwarrior.org/docs
+# task(1)
+
+"""A Taskwarrior hook to track the total active time of a task.
+
+The tracked time is stored in a UDA task duration attribute named ``totalactivetime`` of type ``duration`` holding the total number of seconds the task was
+active. The tracked time can then be included in any report by adding the ``totalactivetime`` column.
+
+By default, this plugin allows to have one task active at a time. This can be changed by setting ``max_active_tasks`` in ``.taskrc`` to a value greater than
+``1``.
+
+Note:
+ Note that this hook is only compatible with Taskwarrior version greater or equal to 2.4!
+
+This hook is a fork from `kostajh/taskwarrior-time-tracking-hook`_
+
+.. _kostajh/taskwarrior-time-tracking-hook:
+ https://github.com/kostajh/taskwarrior-time-tracking-hook
+"""
+
+import datetime
+import json
+import re
+import sys
+import subprocess
+from typing import TypeVar
+
+TIME_FORMAT = "%Y%m%dT%H%M%SZ"
+UDA_KEY = "total_active_time"
+
+MAX_ACTIVE = 1
+
+"""Compiled regular expression for the duration as ISO-8601 formatted string."""
+ISO8601DURATION = re.compile("P((\d*)Y)?((\d*)M)?((\d*)D)?T((\d*)H)?((\d*)M)?((\d*)S)?")
+
+"""The duration type either as integer (in seconds), as ISO-8601 formatted string ("PT1H10M31S") or the seconds suffixed with "seconds"."""
+DurationType = TypeVar("DurationType", str, int)
+
+
+def duration_str_to_time_delta(duration_str: DurationType) -> datetime.timedelta:
+ """Converts duration string into a timedelta object.
+
+ :param duration_str: The duration
+ :return: The duration as timedelta object
+ """
+ if duration_str.startswith("P"):
+ match = ISO8601DURATION.match(duration_str)
+ if match:
+ year = match.group(2)
+ month = match.group(4)
+ day = match.group(6)
+ hour = match.group(8)
+ minute = match.group(10)
+ second = match.group(12)
+ value = 0
+ if second:
+ value += int(second)
+ if minute:
+ value += int(minute) * 60
+ if hour:
+ value += int(hour) * 3600
+ if day:
+ value += int(day) * 3600 * 24
+ if month:
+ # Assume a month is 30 days for now.
+ value += int(month) * 3600 * 24 * 30
+ if year:
+ # Assume a year is 365 days for now.
+ value += int(year) * 3600 * 24 * 365
+ else:
+ value = int(duration_str)
+ elif duration_str.endswith("seconds"):
+ value = int(duration_str.rstrip("seconds"))
+ else:
+ value = int(duration_str)
+ return datetime.timedelta(seconds=value)
+
+
+def main():
+ original = json.loads(sys.stdin.readline())
+ modified = json.loads(sys.stdin.readline())
+
+ # An active task has just been started.
+ if "start" in modified and "start" not in original:
+ # Prevent this task from starting if "task +ACTIVE count" is greater than "MAX_ACTIVE".
+ p = subprocess.Popen(
+ ["task", "+ACTIVE", "status:pending", "count", "rc.verbose:off"],
+ stdout=subprocess.PIPE,
+ )
+ out, err = p.communicate()
+ count = int(out.rstrip())
+ if count >= MAX_ACTIVE:
+ print(
+ "Only %d task(s) can be active at a time. "
+ "See 'max_active_tasks' in .taskrc." % MAX_ACTIVE
+ )
+ sys.exit(1)
+
+ # An active task has just been stopped.
+ if "start" in original and "start" not in modified:
+ # Calculate the elapsed time.
+ start = datetime.datetime.strptime(original["start"], TIME_FORMAT)
+ end = datetime.datetime.utcnow()
+
+ if UDA_KEY not in modified:
+ modified[UDA_KEY] = 0
+
+ this_duration = end - start
+ total_duration = this_duration + duration_str_to_time_delta(
+ str(modified[UDA_KEY])
+ )
+ print(
+ "Total Time Tracked: %s (%s in this instance)"
+ % (total_duration, this_duration)
+ )
+ modified[UDA_KEY] = (
+ str(int(total_duration.days * (60 * 60 * 24) + total_duration.seconds))
+ + "seconds"
+ )
+
+ return json.dumps(modified, separators=(",", ":"))
+
+
+def cmdline():
+ sys.stdout.write(main())
+
+
+if __name__ == "__main__":
+ cmdline()