435 lines
17 KiB
Python
435 lines
17 KiB
Python
import sqlite3
|
|
from functools import total_ordering
|
|
|
|
class Session:
|
|
|
|
def __init__(self, db_agent):
|
|
self.user = -1
|
|
self.db_agent = db_agent
|
|
self.resultstring = 'no results yet'
|
|
self.commands = {'a': (self.set_user, 'Set an active user\n Usage: a number_of_user'),
|
|
'c': (self.show_composers, 'Shows composers'),
|
|
'delete_my_user_record_including_all_mastered_works': (self.delete_user, 'DangerZone: deletes user and all marks of mastered works'),
|
|
'h': (self.show_help, 'Displays this help document'),
|
|
'm': (self.mark_work_as_mastered, 'Marks work or movement as mastered for the active user\n Usage: m number_of_work, m number_of_work number_of_movement'),
|
|
'n': (self.create_new_user, 'Creates new user\n Usage: n firstname name'),
|
|
'r': (self.remove_work_as_mastered, 'Unmarks work or movement as mastered for the active user\n Usage: r number_of_work, r number_of_work number_of_movement'),
|
|
's': (self.show_works, 'Show the works stored in the database\n Usage: s, s number_of_composer'),
|
|
'sm': (self.show_mastered, 'Show mastered movements for activated user'),
|
|
'smo': (self.show_mastered_opus, 'Show mastered movements for activated user, sorted by opus.'),
|
|
'sw': (self.show_movements, 'Show movements of a work\n Usage: sw work_number'),
|
|
'su': (self.show_users, 'Show all users'),
|
|
'q': (self.quit, 'Quits the program')
|
|
}
|
|
|
|
def set_user(self, arguments):
|
|
db_index = arguments[0]
|
|
self.user = db_index
|
|
return 'User set'
|
|
|
|
def get_active_user(self):
|
|
sql_command = f'''
|
|
SELECT first_name, sec_name
|
|
FROM pianist
|
|
WHERE id = {self.user}
|
|
'''
|
|
active_user = ''
|
|
try:
|
|
first_name, name = self.db_agent.execute(sql_command).fetchone()
|
|
active_user = f'{first_name} {name}'
|
|
except:
|
|
active_user = 'No active user'
|
|
return active_user
|
|
|
|
def invoke_command(self, command, arguments):
|
|
self.resultstring = ''
|
|
if command in self.commands.keys():
|
|
self.resultstring += self.commands[command][0](arguments)
|
|
else:
|
|
self.resultstring += 'Could not understand command.'
|
|
|
|
def result(self):
|
|
return self.resultstring
|
|
|
|
def movement_is_mastered(self, work_id, mov_id):
|
|
is_mastered = False
|
|
if self.user != -1:
|
|
sql_command = f'''
|
|
SELECT *
|
|
FROM is_able_to_play
|
|
WHERE work_id = {work_id}
|
|
AND mov_id = {mov_id}
|
|
AND pianist_id = {self.user}
|
|
'''
|
|
if self.db_agent.execute(sql_command).fetchone():
|
|
is_mastered = True
|
|
return is_mastered
|
|
|
|
def percentage_of_work_is_mastered(self, work): # work, not work_id, as a work-object is created anyway when there's need to check
|
|
number_of_movements = len(work.values['movements'])
|
|
count_mastered = 0
|
|
for key in work.values['movements']:
|
|
if self.movement_is_mastered(work.id(), key):
|
|
count_mastered += 1
|
|
return int(count_mastered / number_of_movements * 100)
|
|
|
|
def create_new_user(self, arguments):
|
|
first_name, name = arguments[0], arguments[1]
|
|
sql_command = f'''
|
|
INSERT INTO pianist (first_name, sec_name)
|
|
VALUES ("{first_name}", "{name}");
|
|
'''
|
|
self.db_agent.execute(sql_command)
|
|
return f'Created new user {first_name} {name}.'
|
|
|
|
def delete_user(self, _):
|
|
resultstring = ''
|
|
if self.user == -1:
|
|
resultstring += 'Please activate user'
|
|
else:
|
|
user_name = self.get_active_user()
|
|
sql_command = f'''
|
|
DELETE FROM pianist
|
|
WHERE id = {self.user}
|
|
'''
|
|
self.db_agent.execute(sql_command)
|
|
resultstring += f'Deleted user {user_name}'
|
|
return resultstring
|
|
|
|
|
|
def show_composers(self, _):
|
|
sql_command = '''
|
|
SELECT id, first_name, name
|
|
FROM composer
|
|
ORDER BY name
|
|
'''
|
|
result_list = self.db_agent.execute(sql_command).fetchall()
|
|
fun_resultstring = ''
|
|
for item in result_list:
|
|
fun_resultstring += f'{item[0]}: {item[2]}, {item[1]}\n'
|
|
return fun_resultstring
|
|
|
|
def show_help(self, arguments):
|
|
fun_resultstring = ''
|
|
if len(arguments) > 0:
|
|
qu_com = arguments[0]
|
|
if qu_com in self.commands:
|
|
fun_resultstring += f'{qu_com}: {self.commands[qu_com][1]}'
|
|
else:
|
|
fun_resultstring += f'Command {qu_com} not known, for help press h'
|
|
else:
|
|
for key, value in self.commands.items():
|
|
fun_resultstring += f'{key}: {value[1]}\n'
|
|
return fun_resultstring
|
|
|
|
def show_mastered(self, _, key='standard'):
|
|
fun_resultstring = 'Please activate user'
|
|
if not self.user == -1:
|
|
sql_command = f'''
|
|
SELECT work_id
|
|
FROM is_able_to_play
|
|
WHERE pianist_id = {self.user}
|
|
GROUP BY work_id
|
|
'''
|
|
list_of_work_ids = self.db_agent.execute(sql_command).fetchall()
|
|
list_of_works = list()
|
|
for item in list_of_work_ids:
|
|
list_of_works.append(work_under_id(item[0], self.db_agent))
|
|
fun_resultstring = 'All mastered movements:\n'
|
|
if key == 'opus':
|
|
list_of_works.sort(key=lambda work_s: work_s.opus_and_wd_for_comparison())
|
|
else:
|
|
list_of_works.sort(key=lambda work_s: work_s.sammlung())
|
|
list_of_works.sort()
|
|
for work in list_of_works:
|
|
for mov_number in work.values['movements'].keys():
|
|
# check if movement is mastered
|
|
sql_command = f'''
|
|
SELECT *
|
|
FROM is_able_to_play
|
|
WHERE work_id = {work.id()}
|
|
AND mov_id = {mov_number}
|
|
AND pianist_id = {self.user}
|
|
'''
|
|
is_mastered = len(self.db_agent.execute(sql_command).fetchall()) > 0
|
|
if is_mastered:
|
|
fun_resultstring += f'{work.id()} {mov_number}. {work.pretty_mov(mov_number)}\n'
|
|
return fun_resultstring
|
|
|
|
def show_mastered_opus(self, arguments):
|
|
return self.show_mastered(arguments, key='opus')
|
|
|
|
def show_movements(self, arguments):
|
|
work_id = arguments[0]
|
|
work = work_under_id(work_id, self.db_agent)
|
|
fun_resultstring = ''
|
|
for mov_number in work.values['movements'].keys():
|
|
if self.movement_is_mastered(work_id, mov_number):
|
|
fun_resultstring += '[x] '
|
|
else:
|
|
fun_resultstring += '[ ] '
|
|
fun_resultstring += f'{mov_number}. {work.pretty_mov(mov_number)}\n'
|
|
return fun_resultstring
|
|
|
|
def show_users(self, _):
|
|
sql_command = '''
|
|
SELECT * FROM pianist;
|
|
'''
|
|
result_list = self.db_agent.execute(sql_command).fetchall()
|
|
fun_resultstring = ''
|
|
for item in result_list:
|
|
fun_resultstring += f'{item[0]}: {item[1]} {item[2]}\n'
|
|
return fun_resultstring
|
|
|
|
def show_works(self, arguments):
|
|
if len(arguments) > 0:
|
|
restraint = f'comp_id = {arguments[0]}'
|
|
else:
|
|
restraint = '1 = 1'
|
|
sql_command = f'''
|
|
SELECT id
|
|
FROM work
|
|
WHERE {restraint}
|
|
'''
|
|
list_of_work_ids = self.db_agent.execute(sql_command).fetchall()
|
|
# GET WORKS OUT!
|
|
list_of_works = list()
|
|
for item in list_of_work_ids:
|
|
list_of_works.append(work_under_id(item[0], self.db_agent))
|
|
fun_resultstring = ''
|
|
list_of_works.sort(key=lambda wo: wo.opus_and_wd_for_comparison())
|
|
for work in sorted(list_of_works):
|
|
mastered = self.percentage_of_work_is_mastered(work)
|
|
if mastered == 100:
|
|
fun_resultstring += '[x] '
|
|
elif mastered > 0:
|
|
fun_resultstring += '[/] '
|
|
else:
|
|
fun_resultstring += '[ ] '
|
|
fun_resultstring += f'{work.id()}: {work.pretty_string()}\n'
|
|
return fun_resultstring
|
|
|
|
def mark_work_as_mastered(self, arguments):
|
|
if self.user == -1:
|
|
resultstring = 'Please activate user'
|
|
else:
|
|
work_id = arguments[0]
|
|
resultstring = 'adding:\n'
|
|
work = work_under_id(work_id, self.db_agent)
|
|
if len(arguments) > 1: # in case there is a movement number given
|
|
mov_number = int(arguments[1])
|
|
sql_command = f'''
|
|
INSERT INTO is_able_to_play (work_id, mov_id, pianist_id)
|
|
VALUES ({work_id}, {mov_number}, {self.user})
|
|
'''
|
|
self.db_agent.execute(sql_command)
|
|
resultstring += f'{work.pretty_mov(mov_number)}\n'
|
|
else: # mark all movements of the work as mastered
|
|
for mov_number in work.values['movements'].keys():
|
|
sql_command = f'''
|
|
INSERT INTO is_able_to_play (work_id, mov_id, pianist_id)
|
|
VALUES ({work_id}, {mov_number}, {self.user})
|
|
'''
|
|
self.db_agent.execute(sql_command)
|
|
resultstring += f'{work.pretty_mov(mov_number)}\n'
|
|
return resultstring
|
|
|
|
def remove_work_as_mastered(self, arguments):
|
|
if self.user == -1:
|
|
resultstring = 'Please activate user'
|
|
else:
|
|
work_id = arguments[0]
|
|
resultstring = 'removing:\n'
|
|
work = work_under_id(work_id, self.db_agent)
|
|
if len(arguments) > 1: # in case there is a movment number given
|
|
mov_number = int(arguments[1])
|
|
if self.movement_is_mastered(work_id, mov_number):
|
|
sql_command = f'''
|
|
DELETE FROM is_able_to_play
|
|
WHERE work_id = {work_id}
|
|
AND mov_id = {mov_number}
|
|
AND pianist_id = {self.user}
|
|
'''
|
|
self.db_agent.execute(sql_command)
|
|
resultstring += f'{work.pretty_mov(mov_number)}\n'
|
|
else:
|
|
resultstring += f'Movement was not marked as mastered: {work.pretty_mov(mov_number)}\n'
|
|
else:
|
|
for mov_number in work.values['movements'].keys():
|
|
if self.movement_is_mastered(work_id, mov_number):
|
|
sql_command = f'''
|
|
DELETE FROM is_able_to_play
|
|
WHERE work_id = {work_id}
|
|
AND mov_id = {mov_number}
|
|
AND pianist_id = {self.user}
|
|
'''
|
|
self.db_agent.execute(sql_command)
|
|
resultstring += f'{work.pretty_mov(mov_number)}\n'
|
|
else:
|
|
resultstring += f'Movement was not marked as mastered: {work.pretty_mov(mov_number)}\n'
|
|
return resultstring
|
|
|
|
def quit(self, _):
|
|
return 'Bye bye!'
|
|
|
|
# End session
|
|
|
|
@total_ordering
|
|
class work_under_id:
|
|
# con = sqlite3.connect('repertoire.db')
|
|
# reader = con.cursor()
|
|
|
|
def __init__(self, werk_id, db_agent):
|
|
self.values = dict()
|
|
self.reader = db_agent
|
|
|
|
sql_wui = f"""
|
|
SELECT *
|
|
FROM work
|
|
WHERE id = {werk_id}
|
|
"""
|
|
werk_list_val = self.reader.execute(sql_wui).fetchone()
|
|
werk_list_att = ('id', 'comp_id', 'year', 'opus', 'collection', 'main_key', 'title', 'mov_title', 'alias', 'work_directory', 'wd_number')
|
|
for tup in zip(werk_list_att, werk_list_val):
|
|
self.values[tup[0]] = tup[1]
|
|
|
|
sql_comp = f"""
|
|
SELECT first_name, name
|
|
FROM composer
|
|
WHERE id = {self.values['comp_id']}
|
|
"""
|
|
werk_comp = self.reader.execute(sql_comp).fetchone()
|
|
self.values['first_name'] = werk_comp[0]
|
|
self.values['name'] = werk_comp[1]
|
|
|
|
sql_suw = f"""
|
|
SELECT *
|
|
FROM movement
|
|
WHERE work_id = {werk_id}
|
|
"""
|
|
saetze = dict()
|
|
nummern = set()
|
|
suw_liste = self.reader.execute(sql_suw).fetchall()
|
|
# 0 work_id, 1 mov_number, 2 numb, 3 designation, 4 mus_key
|
|
for satz in suw_liste:
|
|
saetze[satz[1]] = satz[2], satz[3], satz[4]
|
|
if not satz[2] is None:
|
|
nummern.add(satz[2])
|
|
self.values['movements'] = saetze
|
|
if len(nummern) == 1:
|
|
self.sätze_unter_nummer = True
|
|
self.values['numb'] = nummern.pop()
|
|
else:
|
|
self.sätze_unter_nummer = False
|
|
|
|
def sammlung(self):
|
|
sammlung = self.values['collection']
|
|
if not sammlung:
|
|
return 'ohne'
|
|
else:
|
|
return sammlung
|
|
|
|
def id(self):
|
|
return self.values['id']
|
|
|
|
def just_num(self, st):
|
|
num_str = ''
|
|
for c in st:
|
|
if c.isdigit():
|
|
num_str += c
|
|
return int(num_str)
|
|
|
|
def opus_and_wd_for_comparison(self):
|
|
op = 9999 # no composer has that many!
|
|
if not self.values['opus'] is None:
|
|
op = self.just_num(self.values['opus'])
|
|
wd = 0 # no directory starts with 0
|
|
if not self.values['wd_number'] is None:
|
|
wd = self.just_num(self.values['wd_number'])
|
|
return (op, wd)
|
|
|
|
def pretty_string(self):
|
|
ret_str = ''
|
|
for key in ['first_name', 'name',
|
|
'title', 'opus',
|
|
'main_key', 'alias',
|
|
'work_directory','wd_number']:
|
|
if not self.values[key] is None:
|
|
if key == 'opus':
|
|
ret_str += f'op. {self.values[key]} '
|
|
else:
|
|
ret_str += f'{self.values[key]} '
|
|
return ret_str
|
|
|
|
def pretty_mov(self, mov_number):
|
|
ret_str = ''
|
|
for key in ['first_name', 'name',
|
|
'title', 'mov_title', 'opus', 'numb',
|
|
'main_key', 'alias',
|
|
'movements',
|
|
'work_directory','wd_number']:
|
|
if key in self.values.keys() and not self.values[key] is None:
|
|
if key == 'opus':
|
|
ret_str += f'op. {self.values[key]} '
|
|
elif key == 'name':
|
|
ret_str += f'{self.values[key]}, '
|
|
elif key == 'numb':
|
|
ret_str += f'Nr. {self.values[key]} '
|
|
elif key == 'movements':
|
|
if self.values[key][mov_number][1] is None:
|
|
ret_str += ''
|
|
else:
|
|
if len(self.values[key]) > 1:
|
|
ret_str += f'{mov_number}. {self.values[key][mov_number][1]} '
|
|
else:
|
|
ret_str += f'{self.values[key][mov_number][1]} '
|
|
elif key == 'title':
|
|
if self.values['mov_title'] is None:
|
|
ret_str += f'{self.values[key]} '
|
|
else:
|
|
ret_str += f'{self.values[key]} '
|
|
return ret_str
|
|
|
|
def __str__(self):
|
|
ret_str = ''
|
|
for key, value in self.values.items():
|
|
if not value is None:
|
|
ret_str += f'{key}: {value}\n'
|
|
return ret_str
|
|
|
|
def __lt__(self, other):
|
|
return (self.values['name'] < other.values['name'])
|
|
|
|
# End work_under_id
|
|
|
|
def parse_user_input(user_in, session):
|
|
split_user_in = user_in.split()
|
|
command = split_user_in[0]
|
|
if command in session.commands.keys():
|
|
arguments = split_user_in[1:]
|
|
return (command, arguments)
|
|
else:
|
|
return (None, None)
|
|
|
|
def command_line_loop(session):
|
|
user_in = ''
|
|
while not user_in == 'q':
|
|
user_in = input(f'Piano-Repertoire: {session.get_active_user()} >>> ')
|
|
command, arguments = parse_user_input(user_in, session)
|
|
session.invoke_command(command, arguments)
|
|
print(session.result())
|
|
|
|
def main():
|
|
con = sqlite3.connect('repertoire.db')
|
|
db_agent = con.cursor()
|
|
sql_command = 'PRAGMA foreign_keys = ON'
|
|
db_agent.execute(sql_command)
|
|
session = Session(db_agent)
|
|
command_line_loop(session)
|
|
con.commit()
|
|
print('changes committed.')
|
|
con.close()
|
|
print('db-connection closed. Bye-bye!')
|
|
|
|
if __name__ == '__main__':
|
|
main() |