1
1
package com .lowtuna .jsonblob .core ;
2
2
3
- import com .codahale .metrics .Gauge ;
4
- import com .codahale .metrics .MetricRegistry ;
3
+ import com .fasterxml .jackson .core .JsonParseException ;
4
+ import com .fasterxml .jackson .databind .JsonMappingException ;
5
+ import com .fasterxml .jackson .databind .ObjectMapper ;
6
+ import com .google .common .base .Optional ;
5
7
import com .google .common .base .Stopwatch ;
6
8
import io .dropwizard .util .Duration ;
7
9
import lombok .extern .slf4j .Slf4j ;
8
10
import org .apache .commons .io .DirectoryWalker ;
11
+ import org .joda .time .DateTime ;
9
12
10
13
import java .io .File ;
11
14
import java .io .IOException ;
12
15
import java .nio .file .Files ;
13
16
import java .nio .file .Path ;
14
17
import java .time .LocalDate ;
15
18
import java .util .Collection ;
16
- import java .util .concurrent .BlockingQueue ;
17
19
import java .util .concurrent .TimeUnit ;
18
20
import java .util .concurrent .atomic .AtomicInteger ;
19
21
24
26
public class BlobCleanupProducer extends DirectoryWalker <Void > implements Runnable {
25
27
private final Path dataDirectoryPath ;
26
28
private final Duration blobAccessTtl ;
27
- private final BlockingQueue <File > filesToProcess ;
29
+ private final FileSystemJsonBlobManager fileSystemJsonBlobManager ;
30
+ private final ObjectMapper om ;
28
31
29
- public BlobCleanupProducer (Path dataDirectoryPath , Duration blobAccessTtl , BlockingQueue < File > filesToProcess , MetricRegistry metricRegistry ) {
32
+ public BlobCleanupProducer (Path dataDirectoryPath , Duration blobAccessTtl , FileSystemJsonBlobManager fileSystemJsonBlobManager , ObjectMapper om ) {
30
33
super (null , 3 );
31
34
this .dataDirectoryPath = dataDirectoryPath ;
32
35
this .blobAccessTtl = blobAccessTtl ;
33
- this .filesToProcess = filesToProcess ;
34
- metricRegistry . register ( MetricRegistry . name ( getClass (), "filesToProcessCount" ), ( Gauge < Integer >) () -> filesToProcess . size ()) ;
36
+ this .fileSystemJsonBlobManager = fileSystemJsonBlobManager ;
37
+ this . om = om ;
35
38
}
36
39
37
40
@@ -51,10 +54,41 @@ protected boolean handleDirectory(File directory, int depth, Collection<Void> re
51
54
if (file .getName ().startsWith (FileSystemJsonBlobManager .BLOB_METADATA_FILE_NAME )) {
52
55
return ;
53
56
}
57
+
54
58
try {
55
- filesToProcess .put (file );
56
- } catch (InterruptedException e ) {
57
- log .warn ("Interrupted while trying to add file to be processed at {}" , file .getAbsolutePath (), e );
59
+ log .debug ("Processing {}" , file .getAbsolutePath ());
60
+ String blobId = file .getName ().split ("\\ ." , 2 )[0 ];
61
+ File metadataFile = fileSystemJsonBlobManager .getMetaDataFile (file .getParentFile ());
62
+
63
+ if (file .equals (metadataFile )) {
64
+ return ;
65
+ }
66
+
67
+ BlobMetadataContainer metadataContainer = metadataFile .exists () ? om .readValue (fileSystemJsonBlobManager .readFile (metadataFile ), BlobMetadataContainer .class ) : new BlobMetadataContainer ();
68
+
69
+ Optional <DateTime > lastAccessed = fileSystemJsonBlobManager .resolveTimestamp (blobId );
70
+ if (metadataContainer .getLastAccessedByBlobId ().containsKey (blobId )) {
71
+ lastAccessed = Optional .of (metadataContainer .getLastAccessedByBlobId ().get (blobId ));
72
+ }
73
+
74
+ if (!lastAccessed .isPresent ()) {
75
+ log .warn ("Couldn't get last accessed timestamp for blob {}" , blobId );
76
+ return ;
77
+ }
78
+
79
+ log .debug ("Blob {} was last accessed {}" , blobId , lastAccessed .get ());
80
+
81
+ if (lastAccessed .get ().plusMillis ((int ) blobAccessTtl .toMilliseconds ()).isBefore (DateTime .now ())) {
82
+ if (file .delete ()) {
83
+ log .info ("Blob {} hasn't been accessed in {} (last accessed {}), so it's going to be deleted" , blobId , blobAccessTtl , lastAccessed .get ());
84
+ }
85
+ }
86
+ } catch (JsonParseException e ) {
87
+ log .warn ("Couldn't parse JSON from BlobMetadataContainer" , e );
88
+ } catch (JsonMappingException e ) {
89
+ log .warn ("Couldn't map JSON from BlobMetadataContainer" , e );
90
+ } catch (IOException e ) {
91
+ log .warn ("Couldn't read json for BlobMetadataContainer file" , e );
58
92
}
59
93
});
60
94
0 commit comments