root/trunk/bknr/datastore/src/data/txn.lisp

Revision 3946, 32.1 kB (checked in by hans, 2 months ago)

Implement rollback for anonymous transactions. All slot changes
made in a transaction will be undone when an error is signalled
from within the transaction. As changes to the ID slots of created
objects are undone as well, objects created in the transaction will
be removed from their indices and eventually garbage collected.

  • Property svn:eol-style set to native
  • Property svn:keywords set to author date id revision
Line 
1 (in-package :bknr.datastore)
2
3 (cl-interpol:enable-interpol-syntax)
4
5 (defvar *store-debug* nil
6   "Trace and time execution of transactions")
7
8 ;;; conditions
9
10 (define-condition store-error (error)
11   ())
12
13 (define-condition not-in-transaction (store-error)
14   ()
15   (:documentation
16    "Signaled when an operation on persistent slots is executed outside
17    a transaction context"))
18
19 (define-condition store-not-open (store-error)
20   ()
21   (:documentation
22    "Signaled when a transaction is executed on a store that is not
23    opened"))
24
25 (define-condition store-already-open (store-error)
26   ()
27   (:documentation
28    "Signaled when an attempt is made to open a store with another
29    store being open"))
30
31 (define-condition invalid-store-random-state (store-error)
32   ()
33   (:documentation
34    "Signaled when the on-disk store random state cannot be read,
35    typically because it has been written with another Lisp"))
36
37 (define-condition unsupported-lambda-list-option (store-error)
38   ((option :initarg :option :reader option))
39   (:documentation
40    "Signaled when DEFTRANSACTION is used with an unsupported option in
41    its lambda list"))
42
43 (define-condition default-arguments-unsupported (store-error)
44   ((tx-name :initarg :tx-name :reader tx-name)
45    (argument :initarg :argument :reader argument))
46   (:report (lambda (c stream)
47              (format stream "argument ~A defaulted in DEFTRANSACTION ~S"
48                      (argument c) (tx-name c))))
49   (:documentation
50    "Signaled when an argument in a DEFTRANSACTION definition has a
51    default declaration"))
52
53 (define-condition undefined-transaction (store-error)
54   ((tx-name :initarg :tx-name :reader tx-name))
55   (:report (lambda (c stream)
56              (format stream "undefined transaction ~A in transaction log, please ensure that all the necessary code is loaded."
57                      (tx-name c))))
58   (:documentation
59    "Signaled when a named transaction is loaded from the transaction
60    log and no matching function definition could be found"))
61
62 (define-condition invalid-transaction-nesting (store-error)
63   ()
64   (:documentation
65    "Signaled when WITH-TRANSACTION forms are nested."))
66
67 (define-condition anonymous-transaction-in-transaction (store-error)
68   ()
69   (:documentation
70    "Signaled when an anonymous transaction is started from within another transaction, transactions do not nest."))
71
72 (define-condition no-subsystems (store-error)
73   ()
74   (:documentation
75    "Signaled when an attempt is made to snapshot a store without subsystems"))
76
77 (define-condition invalid-environment-access (store-error)
78   ((function :initarg function))
79   (:report (lambda (e stream)
80              (with-slots (function) e
81                (format stream "A transaction function attempted to access the function ~A which ~
82                                would make execution of the transaction non-repeatable."
83                        function)))))
84
85 ;;; Verbose progress reporting of store operations
86
87 (defvar *store-verbose* t)
88
89 (defun report-progress (fmt &rest args)
90   (when *store-verbose*
91     (apply #'format *trace-output* fmt args)))
92
93 ;;; store
94
95 (defvar *store*)
96
97 (defmacro with-store ((store &key) &body body)
98   `(let ((*store* ,store))
99      ,@body))
100
101 (defclass store ()
102   ((directory :initarg :directory
103               :accessor store-directory)
104    (state :accessor store-state
105           :initform :closed
106           :documentation "State of the datastore, can be either :closed, :opened or :read-only")
107    (transaction-log-stream :accessor store-transaction-log-stream :initform nil)
108    (random-state :accessor store-random-state
109                  :initform nil)
110    (guard :reader store-guard
111           :initarg :guard)
112    (log-guard :reader store-log-guard
113               :initarg :log-guard)
114    (subsystems :reader store-subsystems
115                :initarg :subsystems)
116    (transaction-run-time :accessor store-transaction-run-time
117                          :initform 0
118                          :documentation "The total run time of all application transaction code since last snapshot"))
119   (:default-initargs
120    :guard #'funcall
121     :log-guard #'funcall
122     :subsystems (list (make-instance 'store-object-subsystem))))
123
124 (defclass mp-store (store)
125   ()
126   (:default-initargs :guard (let ((lock (mp-make-lock)))
127                               (lambda (thunk)
128                                 (mp-with-recursive-lock-held (lock)
129                                   (funcall thunk))))
130     :log-guard (let ((lock (mp-make-lock)))
131                  (lambda (thunk)
132                    (mp-with-recursive-lock-held (lock)
133                      (funcall thunk)))))
134   (:documentation
135    "Store in which every transaction and operation is protected by a giant lock."))
136
137 (defmethod print-object ((store store) stream)
138   (print-unreadable-object (store stream :type t)
139     (format stream "DIR: \"~a\"" (namestring (store-directory store)))))
140
141 (defgeneric initialize-subsystem (subsystem store store-existed-p))
142 (defmethod initialize-subsystem ((subsystem t) store store-existed-p)
143   (declare (ignore store store-existed-p)))
144
145 (defmethod initialize-instance :before ((store store) &key (make-default t))
146   (when make-default
147     (restart-case
148         (when (and (boundp '*store*)
149                    *store*)
150           (error 'store-already-open))
151       (close-store ()
152         :report "Close the opened store."
153         (close-store)))))
154
155 (defmethod initialize-instance :after ((store store) &key (make-default t))
156   (when (stringp (store-directory store))
157     (setf (store-directory store) (pathname (store-directory store))))
158   (when make-default
159     (setf *store* store))
160   (with-store (store)
161     (let ((store-existed-p (probe-file (store-current-directory store))))
162       (ensure-store-current-directory store)
163       (dolist (subsystem (store-subsystems store))
164         (when *store-debug*
165           (report-progress "Initializing subsystem ~A of ~A~%" subsystem store))
166         (initialize-subsystem subsystem store store-existed-p))
167       (restore-store store))
168     (setf (store-state store) :opened)))
169
170 (defmethod close-store-object ((store store))
171   (close-transaction-log-stream store)
172   (dolist (subsystem (store-subsystems store))
173     (close-subsystem store subsystem))
174   (setf (store-state store) :closed))
175
176 (defun open-store (directory &key (class-name #-mp 'store #+mp 'mp-store) (subsystems (list (make-instance 'store-object-subsystem))))
177   (close-store)
178   (make-instance class-name :directory directory :subsystems subsystems))
179
180 (defun close-store ()
181   (makunbound '*store*))
182
183 (defmacro with-store-guard ((&optional (store '*store*)) &rest body)
184   "Execute BODY in the context of the guard of STORE."
185   `(funcall (store-guard ,store) #'(lambda () ,@body)))
186
187 (defmacro with-log-guard ((&optional (store '*store*)) &rest body)
188   "Execute BODY in the context of the log file guard of STORE."
189   `(funcall (store-log-guard ,store) #'(lambda () ,@body)))
190
191 (defmacro with-store-state ((state &optional (store '*store*)) &rest body)
192   (let ((old-state (gensym)))
193     `(let ((,old-state (store-state ,store)))
194        (setf (store-state ,store) ,state)
195        (unwind-protect (progn ,@body)
196          (setf (store-state ,store) ,old-state)))))
197
198 ;; datastore pathnames
199
200 (defgeneric store-current-directory (store)
201   (:documentation "Returns the name of the current datastore directory."))
202
203 (defmethod store-current-directory ((store store))
204   (merge-pathnames (make-pathname :directory '(:relative "current"))
205                    (store-directory store)))
206
207 (defmethod ensure-store-current-directory ((store store))
208   (ensure-directories-exist (store-current-directory store)))
209
210 (defmethod store-random-state-pathname ((store store))
211   (merge-pathnames #P"random-state" (store-current-directory store)))
212
213 (defun initialize-store-random-state (store)
214   (with-open-file (f (store-random-state-pathname store)
215                      :direction :output :if-does-not-exist :create :if-exists :supersede)
216     (report-progress "initializing store random state~%")
217     (with-standard-io-syntax
218       (prin1 (setf (store-random-state store) (make-random-state t)) f))))
219
220 (defmethod ensure-store-random-state ((store store))
221   (if (probe-file (store-random-state-pathname store))
222       (with-open-file (f (store-random-state-pathname store))
223         (restart-case
224             (setf (store-random-state store)
225                   (handler-case
226                       (read f)
227                     (error (e)
228                       (declare (ignore e))
229                       (error 'invalid-store-random-state))))
230           (initialize-store-random-state ()
231             :report "Initialize the random state of the store.  Use
232 this to reinitialize the random state of the store when porting over a
233 store from another compiler. When transactions of the application
234 depend on the random state, you must snapshot your store before
235 porting to the new compiler."
236             (initialize-store-random-state store))
237           (ignore-store-random-state ()
238             :report "Ignore the on-disk random state of the store.
239 Use this if you want to test a store with another compiler, but do not
240 want to change the store permanently."
241             (setf (store-random-state store) (make-random-state t)))))
242       (initialize-store-random-state store)))
243
244 (defmethod update-store-random-state ((store store))
245   (with-open-file (f (store-random-state-pathname store)
246                      :direction :output :if-does-not-exist :create :if-exists :supersede)
247     (with-standard-io-syntax
248       (prin1 (store-random-state store) f))))
249
250 (defgeneric store-transaction-log-pathname (store-or-directory)
251   (:documentation "Return the pathname of the current transaction log of STORE"))
252
253 (defmethod store-transaction-log-pathname ((directory pathname))
254   (merge-pathnames "transaction-log" directory))
255
256 (defmethod store-transaction-log-pathname ((store store))
257   (store-transaction-log-pathname (store-current-directory store)))
258
259 (defgeneric store-subsystem-snapshot-pathname (store-or-directory subsystem)
260   (:documentation "Return the pathname of the snapshot of SUBSYSTEM of STORE"))
261
262 (defmethod store-subsystem-snapshot-pathname ((directory pathname) subsystem)
263   (let ((name (string-downcase (symbol-name (class-name (class-of subsystem))))))
264     (merge-pathnames (format nil "~a-snapshot" name) directory)))
265
266 (defmethod store-subsystem-snapshot-pathname ((store store) subsystem)
267   (store-subsystem-snapshot-pathname (store-current-directory store) subsystem))
268
269 (defgeneric close-transaction-log-stream (store))
270 (defgeneric store-transaction-log-stream (store))
271
272 (defmethod store-transaction-log-stream :before ((store store))
273   (with-slots (transaction-log-stream) store
274     (unless transaction-log-stream
275       (setf transaction-log-stream (open (store-transaction-log-pathname store)
276                                          :element-type '(unsigned-byte 8)
277                                          :direction :output
278                                          :if-does-not-exist :create
279                                          :if-exists :append
280                                          #+openmcl :sharing #+openmcl :lock)))))
281
282 (defmethod close-transaction-log-stream ((store store))
283   (with-slots (transaction-log-stream) store
284     (when transaction-log-stream
285       (close transaction-log-stream)
286       (setf transaction-log-stream nil))))
287
288 ;;; transaction
289
290 ;;; named transactions - These transactions carry a name and are logged
291 ;;; to the transaction log file as soon as the transaction function
292 ;;; returns.  They are usually defined using 'deftransaction'.
293
294 (defclass transaction ()
295   ((function-symbol :initarg :function-symbol
296                     :reader transaction-function-symbol
297                     :documentation
298                     "Symbol of the function called when executing the transaction")
299    (args :initarg :args
300          :reader transaction-args
301          :initform nil)
302    (timestamp :initarg :timestamp
303               :accessor transaction-timestamp
304               :initform (get-universal-time))))
305
306 (defvar *current-transaction* nil)
307
308 (defun in-transaction-p ()
309   (or *current-transaction*
310       (eq :restore (store-state *store*))))
311
312 (defun current-transaction-timestamp ()
313   (transaction-timestamp *current-transaction*))
314
315 (defun store-open-p ()
316   (not (eq :closed (store-state *store*))))
317
318 (defun store-current-transaction ()
319   (if (in-transaction-p)
320       *current-transaction*
321       (error 'not-in-transaction)))
322
323 ;;; All transactions are executed by an 'executor', which is the store
324 ;;; itself or, in the case of a nested transaction, the parent
325 ;;; transaction.  Named transactions do not explicitly log the nested
326 ;;; transactions as the nesting is implicit, meaning that any repeated
327 ;;; execution of the transactions while rolling forward the
328 ;;; transaction log will automatically repeat the sequence of nested
329 ;;; transaction executions by the program code executed.  Contrasted
330 ;;; to that, an anonymous transaction has no implicit nesting, so any
331 ;;; nested transactions which are called are explicitly logged.
332
333 (defgeneric execute-transaction (executor transaction)
334   (:documentation "Execute TRANSACTION on EXECUTOR (which may be a store or a transaction scope).")
335
336   (:method :before ((executor t) (transaction t))
337            (unless (store-open-p)
338              (error 'store-not-open)))
339
340   (:method ((executor transaction) transaction)
341     (execute-unlogged transaction)))
342
343 (defun find-doc (body)
344   "Given a function definition BODY, extract the docstring, if any.
345 Skips over any declarations that precede the docstring.  See also CLHS
346 3.4.11"
347   (do ((body body (cdr body)))
348       ((or (not (listp (car body)))
349            (not (eq 'declare (caar body))))
350        (when (and (stringp (car body))
351                   (cdr body))
352          (car body)))))
353
354 (defun insert-after-declarations (body forms-to-insert)
355   "Given a function definition body, insert FORMS-TO-INSERT after all
356 declarations and documentation in BODY."
357   (loop for rest on body
358      for form = (car rest)
359      with decls
360      with doc
361      while (or (and (listp form) (eq 'declare (car form)))
362                (and (not doc) (cdr rest) (stringp form)))
363      when (stringp form)
364      do (setf doc form)
365      do (push form decls)
366      finally (return-from insert-after-declarations (append (nreverse decls) forms-to-insert rest))))
367
368 (defun make-args (args)
369   "Parse the lambda list ARGS, returning a list that contains the
370 arguments in the lambda list prepared so that the list can be applied
371 to a function accepting that lambda list.
372
373 For example:
374
375  (MAKE-ARGS '(A B &OPTIONAL C &REST D &KEY E F)) => (A B C :E E :F F)
376
377 It is used to forward arguments to a transaction wrapper generated by
378 DEFTRANSACTION to the actual transaction so that the wrapper function
379 can be declared with the lambda list of the transaction function
380 itself,"
381   (do ((args args (cdr args))
382        result
383        in-keywords-p)
384       ((not args)
385        (nreverse result))
386     (let ((arg (funcall (if (listp (car args)) #'caar #'car) args)))
387       (cond
388         ((eql #\& (aref (symbol-name arg) 0))
389          (case arg
390            (&optional)
391            (&rest (setf args (cdr args))) ; skip argument, too
392            (&key (setf in-keywords-p t))
393            (otherwise (error 'unsupported-lambda-list-option :option arg))))
394         (t
395          (when in-keywords-p
396            (push (intern (symbol-name arg) :keyword) result))
397          (push arg result))))))
398
399 (defmacro deftransaction (name (&rest args) &rest body)
400   "Define a transaction function tx-NAME and a function NAME executing
401 tx-NAME in the context of the current store. The arguments to NAME
402 will be serialized to the transaction-log, and must be supported by
403 the binary encoder. tx-NAME will be called during a roll-forward to
404 repeat any effects that the transaction function had on the persistent
405 store."
406   (let ((name name)
407         (args args)
408         (body body))
409     (dolist (arg args)
410       (when (listp arg)
411         (error 'default-arguments-unsupported :tx-name name :argument (car arg))))
412     (let ((tx-name (intern (format nil "TX-~A" name)
413                            (symbol-package name))))
414       `(progn
415          (defun ,tx-name ,args
416            ,@(insert-after-declarations body
417                                         '((unless (in-transaction-p)
418                                             (error 'not-in-transaction)))))
419          (defun ,name ,args
420            ,@(let ((doc (find-doc body)))
421                   (when doc (list (format nil "[Transaction function wrapper ~A invokes a store transaction]~%~A" name doc))))
422            ,@(let ((rest (member '&rest args)))
423                   (when rest `((declare (ignore ,(second rest))))))
424            (execute (make-instance 'transaction
425                                    :function-symbol ',tx-name
426                                    :timestamp (get-universal-time)
427                                    :args (list ,@(make-args args)))))))))
428
429 (defmethod encode-object ((object transaction) stream)
430   (%write-tag #\T stream)
431   (%encode-symbol (transaction-function-symbol object) stream)
432   (%encode-integer (transaction-timestamp object) stream)
433   (%encode-list (transaction-args object) stream))
434
435 (defmethod decode-object ((tag (eql #\T)) stream)
436   (make-instance 'transaction
437                  :function-symbol (%decode-symbol stream)
438                  :timestamp (%decode-integer stream)
439                  :args (%decode-list stream)))
440
441 (defmethod print-object ((transaction transaction) stream)
442   (print-unreadable-object (transaction stream :type t)
443     (format stream "~A ~A ~{~A~^ ~}"
444             (format-date-time (transaction-timestamp transaction))
445             (transaction-function-symbol transaction)
446             (transaction-args transaction))))
447
448 ;;; operations on transactions
449
450 (defgeneric execute-unlogged (transaction)
451   (:documentation "Invokes the transaction application code
452 by calling the function named by the transactions' function-symbol.
453 Called at transaction execution and restore time within different
454 environment.  May be overridden by specialized transaction types,
455 e.g. the anonymous transaction which writes a set of transactions
456 to the log file in an atomic group"))
457
458 (defmethod execute-unlogged :around ((transaction transaction))
459   "Execute transaction unsafely, catching errors"
460   (let (retval
461         (execution-time 0))
462     (tagbody
463      again
464        (restart-case
465            (let ((start-time (common-lisp::get-internal-run-time))
466                  (*random-state* (store-random-state *store*)))
467              (setf retval (call-next-method))
468              (setf execution-time (- (common-lisp::get-internal-run-time) start-time)))
469          (retry-transaction ()
470            :report (lambda (stream) (format stream "Retry the transaction ~A." transaction))
471            (go again))))
472     (incf (store-transaction-run-time *store*) execution-time)
473     retval))
474
475 (defmethod execute-unlogged :before ((transaction transaction))
476   (when *store-debug*
477     (report-progress "executing transaction ~A at timestamp ~A~%" transaction
478                      (transaction-timestamp transaction))))
479
480 (defmethod execute-unlogged ((transaction transaction))
481   (with-store-guard ()
482     (let ((*current-transaction* transaction))
483       (apply (or (symbol-function (transaction-function-symbol transaction))
484                  (error 'undefined-transaction
485                         :tx-name (transaction-function-symbol transaction)))
486              (transaction-args transaction)))))
487
488 (defun fsync (stream)
489   ;; FINISH-OUTPUT macht leider auch nichts anderes als FORCE-OUTPUT,
490   ;; dabei waere sync()-Semantik zu erwarten.
491   (finish-output stream)
492   #+cmu
493   (unix:unix-fsync (kernel::fd-stream-fd stream))
494   #+sbcl
495   (sb-posix:fsync (sb-kernel::fd-stream-fd stream)))
496
497 (defvar *disable-sync* nil)
498
499 (defmacro without-sync (() &body body)
500   ;; Bei laengeren Importvorgaengen benoetigt das syncen des Transaktionslogs
501   ;; viel Zeit, ist aber an der Stelle nicht notwendig.
502   ` (unwind-protect
503          (let ((*disable-sync* t))
504            ,@body)
505       (with-log-guard ()
506         (fsync (store-transaction-log-stream *store*)))))
507
508 (defmacro with-transaction-log ((transaction) &body body)
509   (check-type transaction symbol) ; otherwise care for multiple evaluation
510   `(with-store-guard ()
511      (when (in-transaction-p)
512        (error 'invalid-transaction-nesting))
513      (with-store-state (:transaction)
514        (prog1
515            (let ((*current-transaction* ,transaction))
516              ,@body)
517          (with-log-guard ()
518            (let ((out (store-transaction-log-stream *store*)))
519              (encode ,transaction out)
520              (unless *disable-sync*
521                (fsync out))))))))
522
523 (defvar *transaction-statistics* (make-statistics-table))
524
525 (defmethod execute-transaction ((store store) (transaction transaction))
526   (with-statistics-log (*transaction-statistics* (transaction-function-symbol transaction))
527     (with-transaction-log (transaction)
528       (execute-unlogged transaction))))
529
530 (defun execute (transaction)
531   "Interface routine to execute a transaction, called through
532 the deftransaction macro and by subsystems.  Executes the
533 transaction either with the store or with the currently active
534 transaction, if any."
535   (execute-transaction (if (in-transaction-p)
536                            *current-transaction*
537                            *store*)
538                        transaction))
539
540 ;;; anonymous transactions - During execution of such a transactions,
541 ;;; nothing is written to a log.  After leaving the body of the
542 ;;; with-transaction block, all transactions which have been executed
543 ;;; are written to the log as a group which will be restored atomically.
544
545 ;;; The actual writing to the transaction log is performed by the
546 ;;; with-transaction macro.
547
548 ;;; An anonymous transaction has an optional label which is stored in
549 ;;; the transaction log in order to make the source code location where
550 ;;; the actual transaction code lives identifieable.
551
552 (defclass anonymous-transaction (transaction)
553   ((label :initarg :label
554           :accessor anonymous-transaction-label
555           :initform (error "missing label in anonymous transaction definition"))
556    (log-buffer :initarg :log-buffer
557                :accessor anonymous-transaction-log-buffer
558                :initform (flex:make-in-memory-output-stream))
559    (undo-log :initform nil
560              :accessor anonymous-transaction-undo-log)))
561
562 (defmethod print-object ((transaction anonymous-transaction) stream)
563   (print-unreadable-object (transaction stream :type t)
564     (format stream "~A ~A (~A)"
565             (format-date-time (transaction-timestamp transaction))
566             (anonymous-transaction-label transaction)
567             (class-name (class-of (anonymous-transaction-log-buffer transaction))))))
568
569 (defmethod in-anonymous-transaction-p ()
570   (subtypep (type-of *current-transaction*) 'anonymous-transaction))
571
572 (defmethod encode-object ((transaction anonymous-transaction) stream)
573   (%write-tag #\N stream)
574   (%encode-string (anonymous-transaction-label transaction) stream)
575   (let ((subtxns (flex:get-output-stream-sequence (anonymous-transaction-log-buffer transaction))))
576     (%encode-integer (length subtxns) stream)
577     (write-sequence subtxns stream)))
578
579 (defmethod decode-object ((tag (eql #\G)) stream)
580   (make-instance 'anonymous-transaction
581                  :transactions (%decode-list stream)))
582
583 (defvar *txn-log-stream* nil
584   "This variable is bound to the transaction log stream while loading
585    the transaction log.  It is used by anonymous transactions to read
586    the subtransactions from the log.")
587
588 (defmethod decode-object ((tag (eql #\N)) stream)
589   (let* ((label (%decode-string stream))
590          (length (%decode-integer stream))
591          (buffer (make-array length :element-type '(unsigned-byte 8))))
592     (read-sequence buffer stream)
593     (make-instance 'anonymous-transaction
594                    :label label
595                    :log-buffer (flex:make-in-memory-input-stream buffer))))
596
597 (define-condition rollback-failed (error)
598   ((transaction :initarg transaction)
599    (original-error :initarg :original-error))
600   (:report (lambda (e stream)
601              (with-slots (transaction original-error) e
602                (format stream "Rollback of transaction ~A failed: ~A" transaction original-error)))))
603
604 (defun anonymous-transaction-undo (transaction)
605   (handler-case
606       (dolist (command (anonymous-transaction-undo-log transaction))
607         (apply (car command) (cdr command)))
608     (error (e)
609       (error 'rollback-failed
610              :transaction transaction
611              :original-error e))))
612
613 (defun do-with-transaction (label thunk)
614   (when (in-transaction-p)
615     (error 'anonymous-transaction-in-transaction))
616   (let ((txn (make-instance 'anonymous-transaction :label label))
617         (next-object-id (next-object-id (store-object-subsystem))))
618     (with-transaction-log (txn)
619       (handler-case
620           (funcall thunk)
621         (error (e)
622           (setf (next-object-id (store-object-subsystem)) next-object-id)
623           (anonymous-transaction-undo txn)
624           (error e))))))
625
626 (defmacro with-transaction ((&optional label) &body body)
627   `(do-with-transaction ,(if (symbolp label) (symbol-name label) label)
628      (lambda () ,@body)))
629
630 (defmethod execute-unlogged ((transaction anonymous-transaction))
631   ;; EXECUTE-UNLOGGED is called for anonymous transactions only when
632   ;; restoring from the transaction log.  It reads and executes the
633   ;; subtransactions from the transaction log.
634   (assert (eq :restore (store-state *store*)) ()
635           "Unexpected store state ~A for EXECUTE-UNLOGGED on an anonymous transaction" (store-state *store*))
636   (let ((stream (anonymous-transaction-log-buffer transaction)))
637     (handler-case
638         (loop
639            (execute-unlogged (decode stream)))
640       (end-of-file ()))))
641
642 (defmethod execute-transaction :before ((executor anonymous-transaction) transaction)
643   (encode transaction (anonymous-transaction-log-buffer executor)))
644
645 ;;; Subsystems
646
647 (defgeneric snapshot-subsystem (store subsystem))
648 (defgeneric close-subsystem (store subsystem))
649
650 (defmethod close-subsystem ((store store) (subsystem t)))
651
652 (defun snapshot ()
653   (snapshot-store *store*))
654
655 (defun make-backup-directory (store)
656   "Create directory pathname to place backup for STORE in.  By
657 default, the current time stamp is used.  If that directory already
658 exists, attach a dot and an incrementing number to the directory
659 pathname until a non-existant directory name has been found."
660   (loop with timetag = (timetag)
661      for i = nil then (if i (incf i) 1)
662      for directory = (merge-pathnames (make-pathname :directory (list :relative (format nil "~A~@[.~A~]" timetag i)))
663                                       (store-directory store))
664      unless (probe-file directory)
665      return directory))
666
667 (defmethod snapshot-store ((store store))
668   (unless (store-open-p)
669     (error 'store-not-open))
670   (when (null (store-subsystems store))
671     (error 'no-subsystems))
672   (ensure-store-current-directory store)
673   (with-store-state (:read-only store)
674     (with-store-guard ()
675       (with-log-guard ()
676         (let ((backup-directory (make-backup-directory store)))
677           (close-transaction-log-stream store)
678
679           ;; CMUCL will, dass das directory existiert, ACL nicht
680           #+(or cmu sbcl)
681           (ensure-directories-exist backup-directory)
682
683           (when *store-debug*
684             (warn "Backup of the datastore in ~A."
685                   backup-directory))
686           (rename-file (store-current-directory store) backup-directory)
687           (ensure-store-current-directory store)
688
689           (let ((error t))
690             (unwind-protect
691                  (with-store-state (:snapshot)
692                    (update-store-random-state store)
693                    (dolist (subsystem (store-subsystems store))
694                      (when *store-debug*
695                        (report-progress "Snapshotting subsystem ~A of ~A~%" subsystem store))
696                      (snapshot-subsystem store subsystem)
697                      (when *store-debug*
698                        (report-progress "Successfully snapshotted ~A of ~A~%" subsystem store)))
699                    (setf (store-transaction-run-time store) 0)
700                    (setf error nil))
701               (when error
702                 (warn "Restoring backup ~A to current." backup-directory)
703                 (rename-file backup-directory (store-current-directory store))))))))))
704
705 (defvar *show-transactions* nil)
706
707 (defun truncate-log (pathname position)
708   (let ((backup (make-pathname :type "backup" :defaults pathname)))
709     (report-progress "~&; creating log file backup: ~A~%" backup)
710     (with-open-file (s pathname
711                        :element-type '(unsigned-byte 8)
712                        :direction :input)
713       (with-open-file (r backup
714                          :element-type '(unsigned-byte 8)
715                          :direction :output)
716         (copy-stream s r))))
717   (report-progress "~&; truncating transaction log at position ~D.~%" position)
718   #+cmu
719   (unix:unix-truncate (ext:unix-namestring pathname) position)
720   #+sbcl
721   (sb-posix:truncate (namestring pathname) position)
722   #+openmcl
723   (ccl:with-cstrs ((filename (namestring pathname)))
724     (#_truncate filename position))
725   #-(or cmu sbcl openmcl)
726   (error "don't know how to truncate files on this platform"))
727
728 (defun load-transaction-log (pathname &key until)
729   (let (length position txn)
730     (restart-case
731         (with-open-file (s pathname
732                            :element-type '(unsigned-byte 8)
733                            :direction :input)
734           (setf length (file-length s))
735           (loop
736              (setf position (file-position s))
737              (unless (< position length)
738                (return))
739              (setf txn (decode s))
740              (cond
741                ((and until
742                      (> (transaction-timestamp txn) until))
743                 (truncate-log pathname position)
744                 (return-from load-transaction-log))
745                (t
746                 (when *show-transactions*
747                   (report-progress "~&;;; ~A txn @~D: ~A~%" (transaction-timestamp txn) position txn))
748                 (let ((*txn-log-stream* s))
749                   (execute-unlogged txn))))))
750       (discard ()
751         :report (lambda (stream) (format stream "Discard transaction log before failing transaction ~A." txn))
752         (truncate-log pathname position)
753         ;; Should maybe throw instead of recursively restoring?  Maybe
754         ;; not, we'll never go into depths > 1 anyway.
755         (restore)))))
756
757 (defgeneric restore-subsystem (store subsystem &key until))
758
759 (defun restore (&optional until)
760   (restore-store *store* :until until))
761
762 (defmethod restore-store ((store store) &key until)
763   (ensure-store-random-state store)
764   (report-progress "restoring ~A~%" store)
765   (let ((*store* store))
766     (setf (store-state store) :opened)
767     (with-store-state (:restore)
768       (with-store-guard ()
769         (with-log-guard ()
770           (close-transaction-log-stream store)
771           (let ((transaction-log (store-transaction-log-pathname store))
772                 (error t))
773             ;; restore the subsystems
774             (unwind-protect
775                  (progn
776                    ;; Subsystems may not do any persistent changes when restoring.
777                    (dolist (subsystem (store-subsystems store))
778                      ;; check that UNTIL > snapshot date
779                      (when *store-debug*
780                        (report-progress "Restoring the subsystem ~A of ~A~%" subsystem store))
781                      (restore-subsystem store subsystem :until until))
782                    (when (probe-file transaction-log)
783                      (report-progress "loading transaction log ~A~%" transaction-log)
784                      (setf (store-transaction-run-time store) 0)
785                      (load-transaction-log transaction-log :until until))
786                    (setf error nil))
787               (when error
788                 (dolist (subsystem (store-subsystems store))
789                   (when *store-debug*
790                     (report-progress "Closing the subsystem ~A of ~A~%"
791                                      subsystem store))
792                   (close-subsystem store subsystem)
793                   (setf (store-state store) :closed))))))))))
794
795 #|
796 (defmacro disallow-cl-function-in-transaction (function)
797   `(defun ,function (&rest args)
798      (when (in-transaction-p)
799        (error 'invalid-environment-access :function ',function))
800      (apply (find-symbol ,(symbol-name function) :common-lisp) args)))
801
802 (disallow-cl-function-in-transaction get-internal-run-time)
803 (disallow-cl-function-in-transaction get-internal-real-time)
804 (disallow-cl-function-in-transaction sleep)
805
806 (defun get-universal-time ()
807   (if (in-transaction-p)
808       (transaction-timestamp *current-transaction*)
809       (common-lisp::get-universal-time)))
810 |#
Note: See TracBrowser for help on using the browser.