Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cache functionality to improve lookup executing times #18

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 32 additions & 6 deletions lookup_plugins/bitwarden.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,17 @@
- Items from Bitwarden vault
"""

# Global variables
bw = None


class Bitwarden(object):
def __init__(self, path):
self._cli_path = path
self._bw_session = ""
self._cache = dict()
self._logged_in = None

try:
check_output([self._cli_path, "--version"])
except OSError:
Expand All @@ -87,11 +93,24 @@ def cli_path(self):

@property
def logged_in(self):
# Parse Bitwarden status to check if logged in
if self.status() == 'unlocked':
return True
else:
return False
if self._logged_in is None:
# Parse Bitwarden status to check if logged in
self._logged_in = (self.status() == 'unlocked')

return self._logged_in

def cache(func):
def inner(*args, **kwargs):
self = args[0]
key = '_'.join(args[1:])

if key not in self._cache:
value = func(*args, **kwargs)
self._cache[key] = value

return self._cache[key]

return inner

def _run(self, args):
my_env = os.environ.copy()
Expand Down Expand Up @@ -123,6 +142,7 @@ def _run(self, args):
return out.strip()

def sync(self):
self._cache = dict() # Clear cache to prevent using old values in cache
self._run(['sync'])

def status(self):
Expand All @@ -132,13 +152,16 @@ def status(self):
raise AnsibleError("Error decoding Bitwarden status: %s" % e)
return data['status']

@cache
def get_entry(self, key, field):
return self._run(["get", field, key])

@cache
def get_notes(self, key):
data = json.loads(self.get_entry(key, 'item'))
return data['notes']

@cache
def get_custom_field(self, key, field):
data = json.loads(self.get_entry(key, 'item'))
return next(x for x in data['fields'] if x['name'] == field)['value']
Expand All @@ -152,7 +175,10 @@ def get_attachments(self, key, itemid, output):
class LookupModule(LookupBase):

def run(self, terms, variables=None, **kwargs):
bw = Bitwarden(path=kwargs.get('path', 'bw'))
global bw

if not bw:
bw = Bitwarden(path=kwargs.get('path', 'bw'))

if not bw.logged_in:
raise AnsibleError("Not logged into Bitwarden: please run "
Expand Down