-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathmark5_view.py
187 lines (162 loc) · 8 KB
/
mark5_view.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
from abstract_machine_view import (Invalid_Selection_Exception,
Abstract_Machine_View, Tree_Widget_Item)
from import_proxy import execute_query, send_query
from shared import format_bytes
import PyQt4.QtGui as QtGui
import PyQt4.QtCore as QtCore
import socket
import contextlib
import time
import collections
class Mark5_View(Abstract_Machine_View):
header_labels = ["Bank", "VSN", "#scan", "Recording", "Size"]
def _get_selection(self):
indices = self.view.selectedIndexes()
# we get one index for each row and column, reduce it to one per row
# and order the bank, rows
row_bank_scan = {}
for index in indices:
if not index.parent().isValid():
raise Invalid_Selection_Exception("Can only operate on "
"recordings, not banks.")
bank = str(self.view.itemFromIndex(index.parent()).text(
self.header_labels.index("Bank")))
assert bank in ["A", "B"]
scan = str(self.view.itemFromIndex(index).text(
self.header_labels.index("Recording")))
number = int(str(self.view.itemFromIndex(index).text(
self.header_labels.index("#scan"))))
row_bank_scan[(bank,index.row())] = (number, scan)
return [(bank, row_bank_scan[(bank, row)]) \
for (bank, row) in sorted(row_bank_scan.keys())]
def _check_selection(self, selection):
text = ""
with contextlib.closing(self._create_socket()) as s:
for (bank, (number, scan)) in selection:
self._select_bank(s, bank)
execute_query(s, "scan_set={scan}".format(scan=number), ["0"])
reply = send_query(s, "scan_check?")
text += reply
return text
def _emit_copy(self, selection):
self.copy_from.emit(
self.mark5.control_ip,
[("mk5://{host}:{port}/{bank}/".format(
host=self.mark5.control_ip, port=self.mark5.port, bank=bank),
str(number) if scan in self.duplicate_recording[bank] else scan)
for (bank, (number, scan)) in selection])
def _get_m5copy_to(self):
indices = self.view.selectedIndexes()
items = list(set(self.view.itemFromIndex(index) for index in indices))
if (len(items) != 1) or \
(items[0].childIndicatorPolicy() != \
Tree_Widget_Item.ShowIndicator):
raise Invalid_Selection_Exception("Only one bank has to be "
"selected as the target bank to copy to.")
return (self.mark5.control_ip, "mk5://{host}:{port}:{data_ip}/{bank}/".\
format(
host=self.mark5.control_ip,
port=self.mark5.port,
data_ip=self.mark5.data_ip,
bank=str(items[0].text(self.header_labels.index("Bank")))))
def _selection_size(self, selection):
return sum([self.recording_sizes[bank][number] \
for (bank, (number, scan)) in selection])
def _create_view_widget(self):
self.view = QtGui.QTreeWidget(self)
self.banks = {bank : Tree_Widget_Item(self.view) \
for bank in ["A", "B"]}
self.recording_sizes = {bank : {} for bank in self.banks.keys()}
self.duplicate_recording = {bank: set() for bank in self.banks.keys()}
for bank, item in self.banks.items():
item.setText(self.header_labels.index("Bank"), bank)
with contextlib.closing(self._create_socket()) as s:
reply = execute_query(s, "bank_set?", ["0"])
for index in [3,5]:
if len(reply) > index:
bank = reply[index-1]
if bank in self.banks.keys():
vsn = reply[index]
bank_item = self.banks[bank]
bank_item.setText(self.header_labels.index("VSN"), vsn)
bank_item.setChildIndicatorPolicy(
Tree_Widget_Item.ShowIndicator)
if index == 3: # active bank
dir_info = execute_query(s, "dir_info?", ["0"])
bank_item.setText(self.header_labels.index("#scan"),
dir_info[2])
bank_item.setText(self.header_labels.index("Size"),
format_bytes(
int(dir_info[3]),
self.bytes_print_size))
else:
# don't want to activate the bank just yet
bank_item.setText(self.header_labels.index("#scan"),
"?")
bank_item.setText(self.header_labels.index("Size"),
"?")
return self.view
def _create_socket(self, timeout=10):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(timeout)
s.connect((self.mark5.control_ip, self.mark5.port))
return s
def _select_bank(self, socket, bank):
reply = execute_query(socket, "bank_set?", ["0", "1"])
while reply[1] == "1":
time.sleep(0.1)
reply = execute_query(socket, "bank_set?", ["0", "1"])
if reply[2] != bank:
reply = execute_query(socket, "bank_set={b}".format(b=bank),
["0", "1"])
while reply[1] in ["1", "6"]:
time.sleep(0.1)
reply = execute_query(socket, "bank_set?", ["0", "6"])
assert reply[2] == bank
def _display_bank_info(self, root):
data = self.background_data[root]
root.setText(self.header_labels.index("#scan"), str(len(data.scans)))
root.setText(self.header_labels.index("Size"),
format_bytes(data.size, self.bytes_print_size))
for index, (recording, size) in enumerate(data.scans):
item = Tree_Widget_Item(root)
item.setText(self.header_labels.index("#scan"), str(index+1))
item.setText(self.header_labels.index("Recording"), recording)
item.setText(self.header_labels.index("Size"),
format_bytes(size, self.bytes_print_size))
def _get_bank_info(self, bank, root):
data = self.background_data[root]
with contextlib.closing(self._create_socket()) as s:
self._select_bank(s, bank)
dir_info = execute_query(s, "dir_info?", ["0"])
# refresh display data
scans = int(dir_info[2])
data.size = int(dir_info[3])
data.scans = []
seen = set()
for scan_index in xrange(scans):
execute_query(s, "scan_set={i}".format(i=scan_index+1),
["0"])
reply = execute_query(s, "scan_set?", ["0"])
number = int(reply[2])
recording = reply[3]
bytes_ = int(reply[5]) - int(reply[4])
self.recording_sizes[bank][number] = bytes_
data.scans.append((recording, bytes_))
if recording in seen:
self.duplicate_recording[bank].add(recording)
else:
seen.add(recording)
def _expand_disk(self, index):
bank = str(self.view.itemFromIndex(index).text(
self.header_labels.index("Bank")))
bank_item = self.banks.get(bank)
if bank_item and (bank_item.childCount() == 0):
self.recording_sizes[bank] = {}
self.load_in_background(bank_item,
lambda: self._get_bank_info(bank, bank_item),
lambda: self._display_bank_info(bank_item))
def __init__(self, mark5, parent=None):
self.mark5 = mark5
super(Mark5_View, self).__init__(parent)
self.view.expanded.connect(self._expand_disk)