1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
|
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (C) 2016-present Arctic Ice Studio <development@arcticicestudio.com>
# Copyright (C) 2016-present Sven Greb <development@svengreb.de>
# Project: igloo
# Repository: https://github.com/arcticicestudio/igloo
# License: MIT
# References:
# https://taskwarrior.org/docs
# task(1)
"""A Taskwarrior hook to track the total active time of a task.
The tracked time is stored in a UDA task duration attribute named ``totalactivetime`` of type ``duration`` holding the total number of seconds the task was
active. The tracked time can then be included in any report by adding the ``totalactivetime`` column.
By default, this plugin allows to have one task active at a time. This can be changed by setting ``max_active_tasks`` in ``.taskrc`` to a value greater than
``1``.
Note:
This hook requires Python 3 and the `taskw`_ package to be installed which provides the python bindings for Taskwarrior!
Also note that this hook is only compatible with Taskwarrior version greater or equal to 2.4!
This hook is a fork from `kostajh/taskwarrior-time-tracking-hook`_
.. _taskw:
https://pypi.python.org/pypi/taskw
.. _kostajh/taskwarrior-time-tracking-hook:
https://github.com/kostajh/taskwarrior-time-tracking-hook
"""
import datetime
import json
import re
import sys
import subprocess
from taskw import TaskWarrior
from typing import TypeVar
TIME_FORMAT = "%Y%m%dT%H%M%SZ"
UDA_KEY = "total_active_time"
w = TaskWarrior(config_filename=sys.argv[4].replace("rc:", ""))
config = w.load_config(config_filename=sys.argv[4].replace("rc:", ""))
if "max_active_tasks" in config:
MAX_ACTIVE = int(config["max_active_tasks"])
else:
MAX_ACTIVE = 1
"""Compiled regular expression for the duration as ISO-8601 formatted string."""
ISO8601DURATION = re.compile("P((\d*)Y)?((\d*)M)?((\d*)D)?T((\d*)H)?((\d*)M)?((\d*)S)?")
"""The duration type either as integer (in seconds), as ISO-8601 formatted string ("PT1H10M31S") or the seconds suffixed with "seconds"."""
DurationType = TypeVar("DurationType", str, int)
def duration_str_to_time_delta(duration_str: DurationType) -> datetime.timedelta:
"""Converts duration string into a timedelta object.
:param duration_str: The duration
:return: The duration as timedelta object
"""
if duration_str.startswith("P"):
match = ISO8601DURATION.match(duration_str)
if match:
year = match.group(2)
month = match.group(4)
day = match.group(6)
hour = match.group(8)
minute = match.group(10)
second = match.group(12)
value = 0
if second:
value += int(second)
if minute:
value += int(minute) * 60
if hour:
value += int(hour) * 3600
if day:
value += int(day) * 3600 * 24
if month:
# Assume a month is 30 days for now.
value += int(month) * 3600 * 24 * 30
if year:
# Assume a year is 365 days for now.
value += int(year) * 3600 * 24 * 365
else:
value = int(duration_str)
elif duration_str.endswith("seconds"):
value = int(duration_str.rstrip("seconds"))
else:
value = int(duration_str)
return datetime.timedelta(seconds=value)
def main():
original = json.loads(sys.stdin.readline())
modified = json.loads(sys.stdin.readline())
# An active task has just been started.
if "start" in modified and "start" not in original:
# Prevent this task from starting if "task +ACTIVE count" is greater than "MAX_ACTIVE".
p = subprocess.Popen(["task", "+ACTIVE", "status:pending", "count", "rc.verbose:off"], stdout=subprocess.PIPE)
out, err = p.communicate()
count = int(out.rstrip())
if count >= MAX_ACTIVE:
print("Only %d task(s) can be active at a time. "
"See 'max_active_tasks' in .taskrc." % MAX_ACTIVE)
sys.exit(1)
# An active task has just been stopped.
if "start" in original and "start" not in modified:
# Calculate the elapsed time.
start = datetime.datetime.strptime(original["start"], TIME_FORMAT)
end = datetime.datetime.utcnow()
if UDA_KEY not in modified:
modified[UDA_KEY] = 0
this_duration = (end - start)
total_duration = (this_duration + duration_str_to_time_delta(str(modified[UDA_KEY])))
print("Total Time Tracked: %s (%s in this instance)" % (total_duration, this_duration))
modified[UDA_KEY] = str(int(total_duration.days * (60 * 60 * 24) + total_duration.seconds)) + "seconds"
return json.dumps(modified, separators=(",", ":"))
def cmdline():
sys.stdout.write(main())
if __name__ == "__main__":
cmdline()
|