diff options
author | Benedikt Peetz <benedikt.peetz@b-peetz.de> | 2024-05-23 13:26:22 +0200 |
---|---|---|
committer | Benedikt Peetz <benedikt.peetz@b-peetz.de> | 2024-05-23 13:26:22 +0200 |
commit | 204731c0a69136c9cebcb54f1afecf5145e26bbe (patch) | |
tree | fc9132e5dc74e4a8e1327cdd411839a90f9410aa /pkgs/by-name/up/update-vim-plugins/update_vim_plugins/update.py | |
parent | refactor(sys): Modularize and move to `modules/system` or `pkgs` (diff) | |
download | nixos-config-204731c0a69136c9cebcb54f1afecf5145e26bbe.zip |
refactor(pkgs): Categorize into `by-name` shards
This might not be the perfect way to organize a package set -- especially if the set is not nearly the size of nixpkgs -- but it is _at_ least a way of organization.
Diffstat (limited to 'pkgs/by-name/up/update-vim-plugins/update_vim_plugins/update.py')
-rw-r--r-- | pkgs/by-name/up/update-vim-plugins/update_vim_plugins/update.py | 212 |
1 files changed, 212 insertions, 0 deletions
diff --git a/pkgs/by-name/up/update-vim-plugins/update_vim_plugins/update.py b/pkgs/by-name/up/update-vim-plugins/update_vim_plugins/update.py new file mode 100644 index 00000000..7eb3eeb4 --- /dev/null +++ b/pkgs/by-name/up/update-vim-plugins/update_vim_plugins/update.py @@ -0,0 +1,212 @@ +import subprocess +from random import shuffle +from cleo.helpers import argument, option +from cleo.commands.command import Command +from concurrent.futures import ThreadPoolExecutor, as_completed + +from pprint import pprint + +from .plugin import plugin_from_spec + +from .helpers import read_manifest_to_spec, get_const +from .helpers import JSON_FILE, PLUGINS_LIST_FILE, PKGS_FILE + +import json +import jsonpickle + +jsonpickle.set_encoder_options("json", sort_keys=True) + + +class UpdateCommand(Command): + name = "update" + description = "Generate nix module from input file" + arguments = [argument("plug_dir", description="Path to the plugin directory", optional=False)] + options = [ + option("all", "a", description="Update all plugins. Else only update new plugins", flag=True), + option("dry-run", "d", description="Show which plugins would be updated", flag=True), + ] + + def handle(self): + """Main command function""" + + plug_dir = self.argument("plug_dir") + self.specs = read_manifest_to_spec(plug_dir) + + if self.option("all"): + # update all plugins + spec_list = self.specs + known_plugins = [] + else: + # filter plugins we already know + spec_list = self.specs + + with open(get_const(JSON_FILE, plug_dir), "r") as json_file: + data = json.load(json_file) + + known_specs = list(filter(lambda x: x.line in data, spec_list)) + known_plugins = [jsonpickle.decode(data[x.line]) for x in known_specs] + + spec_list = list(filter(lambda x: x.line not in data, spec_list)) + + if self.option("dry-run"): + self.line("<comment>These plugins would be updated</comment>") + pprint(spec_list) + self.line(f"<info>Total:</info> {len(spec_list)}") + exit(0) + + processed_plugins, failed_plugins, failed_but_known = self.process_manifest(spec_list, plug_dir) + + processed_plugins += known_plugins # add plugins from .plugins.json + processed_plugins: list = sorted(set(processed_plugins)) # remove duplicates based only on source line + + self.check_duplicates(processed_plugins) + + if failed_plugins != []: + self.line("<error>Not processed:</error> The following plugins could not be updated") + for s, e in failed_plugins: + self.line(f" - {s!r} - {e}") + + if failed_but_known != []: + self.line( + "<error>Not updated:</error> The following plugins could not be updated but an older version is known" + ) + for s, e in failed_but_known: + self.line(f" - {s!r} - {e}") + + # update plugin "database" + self.write_plugins_json(processed_plugins, plug_dir) + + # generate output + self.write_plugins_nix(processed_plugins, plug_dir) + + self.write_plugins_markdown(processed_plugins, plug_dir) + + self.line("<comment>Done</comment>") + + def write_plugins_markdown(self, plugins, plug_dir): + """Write the list of all plugins to PLUGINS_LIST_FILE in markdown""" + + plugins.sort() + + self.line("<info>Updating plugins.md</info>") + + header = f" - Plugin count: {len(plugins)}\n\n| Repo | Last Update | Nix package name | Last checked |\n|:---|:---|:---|:---|\n" + + with open(get_const(PLUGINS_LIST_FILE, plug_dir), "w") as file: + file.write(header) + for plugin in plugins: + file.write(f"{plugin.to_markdown()}\n") + + def write_plugins_nix(self, plugins, plug_dir): + self.line("<info>Generating nix output</info>") + + plugins.sort() + + header = "{ lib, buildVimPlugin, fetchurl, fetchgit }: {" + footer = "}" + + with open(get_const(PKGS_FILE, plug_dir), "w") as file: + file.write(header) + for plugin in plugins: + file.write(f"{plugin.to_nix()}\n") + file.write(footer) + + self.line("<info>Formatting nix output</info>") + + subprocess.run( + ["alejandra", get_const(PKGS_FILE, plug_dir)], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + + def write_plugins_json(self, plugins, plug_dir): + self.line("<info>Storing results in .plugins.json</info>") + + plugins.sort() + + with open(get_const(JSON_FILE, plug_dir), "r+") as json_file: + data = json.load(json_file) + + for plugin in plugins: + data.update({f"{plugin.source_line}": plugin.to_json()}) + + json_file.seek(0) + json_file.write(json.dumps(data, indent=2, sort_keys=True)) + json_file.truncate() + + def check_duplicates(self, plugins): + """check for duplicates in proccesed_plugins""" + error = False + for i, plugin in enumerate(plugins): + for p in plugins[i + 1 :]: + if plugin.name == p.name: + self.line( + f"<error>Error:</error> The following two lines produce the same plugin name:\n - {plugin.source_line}\n - {p.source_line}\n -> {p.name}" + ) + error = True + + # We want to exit if the resulting nix file would be broken + # But we want to go through all plugins before we do so + if error: + exit(1) + + def generate_plugin(self, spec, i, size, plug_dir): + debug_string = "" + + processed_plugin = None + failed_but_known = None + failed_plugin = None + try: + debug_string += f" - <info>({i+1}/{size}) Processing</info> {spec!r}\n" + vim_plugin = plugin_from_spec(spec) + debug_string += f" • <comment>Success</comment> {vim_plugin!r}\n" + processed_plugin = vim_plugin + except Exception as e: + debug_string += f" • <error>Error:</error> Could not update <info>{spec.name}</info>. Keeping old values. Reason: {e}\n" + with open(get_const(JSON_FILE, plug_dir), "r") as json_file: + data = json.load(json_file) + + plugin_json = data.get(spec.line) + if plugin_json: + vim_plugin = jsonpickle.decode(plugin_json) + processed_plugin = vim_plugin + failed_but_known = (vim_plugin, e) + else: + debug_string += f" • <error>Error:</error> No entries for <info>{spec.name}</info> in '.plugins.json'. Skipping...\n" + failed_plugin = (spec, e) + + self.line(debug_string.strip()) + + return processed_plugin, failed_plugin, failed_but_known + + def process_manifest(self, spec_list, plug_dir): + """Read specs in 'spec_list' and generate plugins""" + + size = len(spec_list) + + # We have to assume that we will reach an api limit. Therefore + # we randomize the spec list to give every entry the same change to be updated and + # not favor those at the start of the list + shuffle(spec_list) + + with ThreadPoolExecutor() as executor: + futures = [ + executor.submit(self.generate_plugin, spec, i, size, plug_dir) for i, spec in enumerate(spec_list) + ] + results = [future.result() for future in as_completed(futures)] + + processed_plugins = [r[0] for r in results] + failed_plugins = [r[1] for r in results] + failed_but_known = [r[2] for r in results] + + processed_plugins = list(filter(lambda x: x is not None, processed_plugins)) + failed_plugins = list(filter(lambda x: x is not None, failed_plugins)) + failed_but_known = list(filter(lambda x: x is not None, failed_but_known)) + + processed_plugins.sort() + failed_plugins.sort() + failed_but_known.sort() + + assert len(processed_plugins) == len(spec_list) - len(failed_plugins) + + return processed_plugins, failed_plugins, failed_but_known |