#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright (C) 2016-present Arctic Ice Studio # Copyright (C) 2016-present Sven Greb # Project: igloo # Repository: https://github.com/arcticicestudio/igloo # License: MIT # References: # https://taskwarrior.org/docs # task(1) """A Taskwarrior hook to track the total active time of a task. The tracked time is stored in a UDA task duration attribute named ``totalactivetime`` of type ``duration`` holding the total number of seconds the task was active. The tracked time can then be included in any report by adding the ``totalactivetime`` column. By default, this plugin allows to have one task active at a time. This can be changed by setting ``max_active_tasks`` in ``.taskrc`` to a value greater than ``1``. Note: Note that this hook is only compatible with Taskwarrior version greater or equal to 2.4! This hook is a fork from `kostajh/taskwarrior-time-tracking-hook`_ .. _kostajh/taskwarrior-time-tracking-hook: https://github.com/kostajh/taskwarrior-time-tracking-hook """ import datetime import json import sys import subprocess from typing import TypeVar TIME_FORMAT = "%Y%m%dT%H%M%SZ" UDA_KEY = "total_active_time" MAX_ACTIVE = 1 """The duration type either as integer (in seconds), as ISO-8601 formatted string ("PT1H10M31S") or the seconds suffixed with "seconds".""" DurationType = TypeVar("DurationType", str, int) def duration_str_to_time_delta(duration_str: DurationType) -> datetime.timedelta: """Converts duration string into a timedelta object. :param duration_str: The duration :return: The duration as timedelta object """ if duration_str.startswith("P"): value = datetime.fromisoformat(duration_str) elif duration_str.endswith("seconds"): value = int(duration_str.rstrip("seconds")) else: value = int(duration_str) return datetime.timedelta(seconds=value) def main(): original = json.loads(sys.stdin.readline()) modified = json.loads(sys.stdin.readline()) # An active task has just been started. if "start" in modified and "start" not in original: # Prevent this task from starting if "task +ACTIVE count" is greater than "MAX_ACTIVE". p = subprocess.Popen( ["task", "+ACTIVE", "status:pending", "count", "rc.verbose:off"], stdout=subprocess.PIPE, ) out, err = p.communicate() count = int(out.rstrip()) if count >= MAX_ACTIVE: print( "Only %d task(s) can be active at a time. " "See 'max_active_tasks' in .taskrc." % MAX_ACTIVE ) sys.exit(1) # An active task has just been stopped. if "start" in original and "start" not in modified: # Calculate the elapsed time. start = datetime.datetime.strptime(original["start"], TIME_FORMAT) end = datetime.datetime.utcnow() if UDA_KEY not in modified: modified[UDA_KEY] = 0 this_duration = end - start total_duration = this_duration + duration_str_to_time_delta( str(modified[UDA_KEY]) ) print( "Total Time Tracked: %s (%s in this instance)" % (total_duration, this_duration) ) modified[UDA_KEY] = ( str(int(total_duration.days * (60 * 60 * 24) + total_duration.seconds)) + "seconds" ) return json.dumps(modified, separators=(",", ":")) def cmdline(): sys.stdout.write(main()) if __name__ == "__main__": cmdline()