-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathUnitBlockMerger.pas
226 lines (199 loc) · 6.17 KB
/
UnitBlockMerger.pas
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
unit UnitBlockMerger;
interface
uses
Windows, SysUtils, Classes,
Common, UnitThreadManager;
type
TBlockMergerThread = class(TProtoThread)
private
FFileName1, FFileName2: string;
type
TBuffer = record
private
FFileName: string;
FBuffer: PAnsiChar;
FCurPtr: PAnsiChar;
FBufferEnd: PAnsiChar; // óêàçàòåëü íà êîíåö ïîñëåäíåé ïîëíîé ñòðîêè â áóôôåðå (òî÷íåå, íà÷àëî ñëåäóþùåé, ïîìåñòèâøåéñÿ íå äî êîíöà)
FStream: TStream;
BytesInBuffer: integer;
FLastCRLFAdded: boolean;
procedure SetCurPtr(const Value: PAnsiChar);
procedure ReadBlockFromFile;
public
constructor Create(const FileName: string);
procedure Destroy;
function IsEmpty: boolean;
procedure WriteLine(const Stream: TStream; const Line, LineEnd: PAnsiChar);
procedure CopyTo(const Destination: TStream; const ExcludeLastCRLF: boolean);
procedure CheckPtr(var Ptr: PAnsiChar); // ïðîâåðÿåò, ÷òî óêàçàòåëü íå âûøåë çà ãðàíèöó áóôåðà - íà ñëó÷àé äîáàâëåííîãî â êîíöå ïîñëåäíåé ñòðîêè CRLF
property CurPtr: PAnsiChar
read FCurPtr write SetCurPtr;
property LastCRLFAdded: boolean
read FLastCRLFAdded;
end;
public
constructor Create(const BlockIndex: integer; const FileName1, FileName2, ResultFileName: string);
procedure Execute; override;
end;
implementation
uses UnitBufferedFileStream;
{ TBlockMerger.TBuffer }
procedure TBlockMergerThread.TBuffer.CheckPtr(var Ptr: PAnsiChar);
begin
if Ptr > FBufferEnd then
Ptr := FBufferEnd;
end;
procedure TBlockMergerThread.TBuffer.CopyTo(const Destination: TStream; const ExcludeLastCRLF: boolean);
begin
if ExcludeLastCRLF then
Dec(BytesInBuffer, 2);
Destination.WriteBuffer(FCurPtr^, FBuffer + BytesInBuffer - FCurPtr);
if FStream.Size <> FStream.Position then
Destination.CopyFrom(FStream, FStream.Size - FStream.Position);
// óñòàíîâèì ïðèçíàêè ïóñòîòû ïîòîêà
FBufferEnd := FBuffer;
FCurPtr := FBuffer;
end;
constructor TBlockMergerThread.TBuffer.Create(const FileName: string);
begin
FFileName := FileName;
try
FStream := TFileStream.Create(FileName, fmOpenRead);
except
writeln('Cannot open temporary file ', FileName);
Halt;
end;
GetMem(FBuffer, MergeBufferSize + 2); // 2 áàéòà äëÿ CRLF, êîòîðîãî ìîæåò íå áûòü ïîñëå ïîñëåäíåé ñòðîêè â ôàéëå
BytesInBuffer := 0;
FLastCRLFAdded := false;
ReadBlockFromFile;
end;
procedure TBlockMergerThread.TBuffer.Destroy;
begin
FreeMem(FBuffer);
FStream.Free;
if not Debug then
DeleteFile(FFileName);
end;
function TBlockMergerThread.TBuffer.IsEmpty: boolean;
begin
Result := (FCurPtr >= FBufferEnd) and (FStream.Position = FStream.Size);
end;
procedure TBlockMergerThread.TBuffer.ReadBlockFromFile;
var
BufRestSize: integer;
BufStart: PAnsiChar;
begin
if BytesInBuffer > 0 then
begin
BufRestSize := FBuffer + BytesInBuffer - FBufferEnd; // êîëè÷åñòâî ñèìâîëîâ îò ñòðîêè, íå ïîìåñòèâøåéñÿ â áóôåð öåëèêîì îò ïðåäûäóùåãî ÷òåíèÿ
if BufRestSize > 0 then
begin
// ïåðåíåñåì â íà÷àëî áóôåðà ýòîò íåïîìåñòèâøèéñÿ â ïðîøëûé ðàç êóñîê ñòðîêè
BufStart := FBuffer + BufRestSize;
Move(FBufferEnd^, FBuffer^, BufRestSize);
end
else
BufStart := FBuffer;
end
else
begin
BufRestSize := 0;
BufStart := FBuffer;
end;
FCurPtr := FBuffer;
if FStream.Position <> FStream.Size then
begin
BytesInBuffer := FStream.Read(BufStart^, MergeBufferSize - BufRestSize) + BufRestSize;
FBufferEnd := FindLastCRLF(BufStart, FBuffer + BytesInBuffer);
if FBufferEnd = nil then
begin
// ïðî÷èòàíà ïîñëåäíÿÿ ñòðîêà â ôàéëå, ïîìåñòèâøàÿñÿ öåëèêîì â áóôåð è ïîñëå íåå íå áûëî CRLF
FBufferEnd := FBuffer + BytesInBuffer;
if (BytesInBuffer < 2) or (PWord(FBufferEnd - 2)^ <> CRLF) then
PWord(FBufferEnd)^ := CRLF;
end
else
Inc(FBufferEnd, 2)
end
else
begin
// ôàéë óæå áûë ïðî÷èòàí äî êîíöà è â áóôôåðå áîëüøå íè÷åãî íåò
FBufferEnd := FBuffer + BufRestSize;
if BufRestSize > 0 then
begin
// Åñëè â áóôåðå ÷òî-òî åùå ïðè ýòîì îñòàâàëîñü, òî çíà÷èò áûëà ïîñëåäíÿÿ ñòðîêà áåç CRLF â êîíöå
PWord(FBufferEnd)^ := CRLF;
Inc(FBufferEnd, 2);
BytesInBuffer := BufRestSize + 2;
FLastCRLFAdded := true;
end;
end;
end;
procedure TBlockMergerThread.TBuffer.SetCurPtr(const Value: PAnsiChar);
begin
if Value >= FBufferEnd then
ReadBlockFromFile
else
FCurPtr := Value;
end;
procedure TBlockMergerThread.TBuffer.WriteLine(const Stream: TStream; const Line, LineEnd: PAnsiChar);
begin
Stream.WriteBuffer(Line^, LineEnd - Line);
end;
{ TBlockMerger }
constructor TBlockMergerThread.Create(const BlockIndex: integer; const FileName1, FileName2, ResultFileName: string);
begin
inherited Create(BlockIndex);
FFileName1 := FileName1;
FFileName2 := FileName2;
end;
procedure TBlockMergerThread.Execute;
var
RF: TStream;
Buffer1, Buffer2: TBuffer;
NextStr1, NextStr2: PAnsiChar;
begin
Buffer1.Create(FFileName1);
try
Buffer2.Create(FFileName2);
try
RF := TWriteCachedFileStream.Create(ResultFileName, MergeWriteBufferSize, 0, false);
try
while not Buffer1.IsEmpty and not Buffer2.IsEmpty do
begin
if CompareStrings(Buffer1.CurPtr, Buffer2.CurPtr, NextStr1, NextStr2) <= 0 then
begin
Buffer1.CheckPtr(NextStr1);
Buffer1.WriteLine(RF, Buffer1.CurPtr, NextStr1);
Buffer1.CurPtr := NextStr1;
end
else
begin
Buffer2.CheckPtr(NextStr2);
Buffer2.WriteLine(RF, Buffer2.CurPtr, NextStr2);
Buffer2.CurPtr := NextStr2;
end;
end;
// äîïèñàòü îñòàòêè âòîðîãî ïîòîêà
if Buffer1.IsEmpty then
Buffer2.CopyTo(RF, Buffer1.LastCRLFAdded or Buffer2.LastCRLFAdded)
else
Buffer1.CopyTo(RF, Buffer1.LastCRLFAdded or Buffer2.LastCRLFAdded);
finally
RF.Free;
end;
finally
try
Buffer2.Destroy;
except
NextStr1 := nil;
end;
end;
finally
Buffer1.Destroy;
end;
Log(Format('Merged %s + %s -> %s', [FFileName1, FFileName2, ResultFileName]));
inherited;
end;
end.