summaryrefslogtreecommitdiff
path: root/src/os/IndexManager.cc
blob: c04e1edb47fcd513820b7d0b610c4f7fe624b4ac (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
// vim: ts=8 sw=2 smarttab
/*
 * Ceph - scalable distributed file system
 *
 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
 *
 * This is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License version 2.1, as published by the Free Software 
 * Foundation.  See file COPYING.
 * 
 */

#include <errno.h>

#if defined(__FreeBSD__)
#include <sys/param.h>
#endif

#include "include/buffer.h"
#include "global/global_context.h"
#include "global/debug.h"

#include "chain_xattr.h"

#include "IndexManager.h"

static int set_version(const char *path, uint32_t version) {
  bufferlist bl;
  ::encode(version, bl);
  return chain_setxattr(path, "user.cephos.collection_version", bl.c_str(), 
		     bl.length());
}

static int get_version(const char *path, uint32_t *version) {
  bufferptr bp(PATH_MAX);
  int r = chain_getxattr(path, "user.cephos.collection_version", 
		      bp.c_str(), bp.length());
  if (r < 0) {
    if (r != -ENOENT) {
      *version = 0;
      return 0;
    } else {
      return r;
    }
  }
  bp.set_length(r);
  bufferlist bl;
  bl.push_back(bp);
  bufferlist::iterator i = bl.begin();
  ::decode(*version, i);
  return 0;
}

void IndexManager::put_index(coll_t c) {
  Mutex::Locker l(lock);
  assert(col_indices.count(c));
  col_indices.erase(c);
  cond.Signal();
}

int IndexManager::init_index(coll_t c, const char *path, uint32_t version) {
  Mutex::Locker l(lock);
  int r = set_version(path, version);
  if (r < 0)
    return r;
  HashIndex index(c, path, g_conf->filestore_merge_threshold,
		  g_conf->filestore_split_multiple,
		  CollectionIndex::HASH_INDEX_TAG_2,
		  g_conf->filestore_index_retry_probability);
  return index.init();
}

int IndexManager::build_index(coll_t c, const char *path, Index *index) {
  int r;
  if (upgrade) {
    // Need to check the collection generation
    uint32_t version = 0;
    r = get_version(path, &version);
    if (r < 0)
      return r;

    switch (version) {
    case CollectionIndex::FLAT_INDEX_TAG: {
      *index = Index(new FlatIndex(c, path),
		     RemoveOnDelete(c, this));
      return 0;
    }
    case CollectionIndex::HASH_INDEX_TAG: // fall through
    case CollectionIndex::HASH_INDEX_TAG_2: // fall through
    case CollectionIndex::HOBJECT_WITH_POOL: {
      // Must be a HashIndex
      *index = Index(new HashIndex(c, path, g_conf->filestore_merge_threshold,
				   g_conf->filestore_split_multiple, version), 
		     RemoveOnDelete(c, this));
      return 0;
    }
    default: assert(0);
    }

  } else {
    // No need to check
    *index = Index(new HashIndex(c, path, g_conf->filestore_merge_threshold,
				 g_conf->filestore_split_multiple,
				 CollectionIndex::HOBJECT_WITH_POOL,
				 g_conf->filestore_index_retry_probability),
		   RemoveOnDelete(c, this));
    return 0;
  }
}

int IndexManager::get_index(coll_t c, const char *path, Index *index) {
  Mutex::Locker l(lock);
  while (1) {
    if (!col_indices.count(c)) {
      int r = build_index(c, path, index);
      if (r < 0)
	return r;
      (*index)->set_ref(*index);
      col_indices[c] = (*index);
      break;
    } else {
      cond.Wait(lock);
    }
  }
  return 0;
}